Настройка Kafka Schema Registry для валидации сообщений

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка Kafka Schema Registry для валидации сообщений
Сложная
~2-3 рабочих дня
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка Kafka Schema Registry для валидации сообщений

Без Schema Registry Kafka-топики — это слепые байтовые потоки. Продюсер поменял формат JSON — консьюмер упал с NullPointerException. Schema Registry решает эту проблему: схема сообщения версионируется, эволюция контролируется, несовместимые изменения блокируются до публикации.

Используется совместно с форматами Avro, Protobuf или JSON Schema.

Архитектура

Schema Registry — отдельный HTTP-сервис, который хранит схемы в Kafka-топике _schemas. Продюсер при первой отправке регистрирует схему и получает schema_id (целое число). Вместо полной схемы в каждое сообщение вшивается только schema_id (4 байта) — это и есть wire format Confluent.

Producer → [magic byte 0x00][schema_id 4 bytes][serialized payload] → Kafka
Consumer → читает schema_id → запрашивает схему из Registry → десериализует

Установка Confluent Schema Registry

# Через Docker Compose (типичный сетап)
version: '3.8'
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "BACKWARD"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
      interval: 10s
      retries: 5

Для продуктива — минимум 2 экземпляра за балансировщиком, один является master.

Определение Avro-схемы

{
  "type": "record",
  "name": "OrderEvent",
  "namespace": "com.example.orders",
  "doc": "Событие изменения заказа",
  "fields": [
    {
      "name": "event_id",
      "type": "string",
      "doc": "UUID события"
    },
    {
      "name": "order_id",
      "type": "long"
    },
    {
      "name": "user_id",
      "type": "long"
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
      }
    },
    {
      "name": "amount",
      "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
    },
    {
      "name": "created_at",
      "type": {"type": "long", "logicalType": "timestamp-millis"}
    },
    {
      "name": "metadata",
      "type": ["null", {"type": "map", "values": "string"}],
      "default": null,
      "doc": "Опциональные метаданные — новое поле, backward-compatible"
    }
  ]
}

Регистрация схемы через REST API

# Регистрируем схему для subject "order-events-value"
# Subject naming strategy: {topic}-value (дефолт) или кастомная

curl -X POST http://schema-registry:8081/subjects/order-events-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"status\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
  }'

# Ответ: {"id": 1}

# Получить все версии схемы
curl http://schema-registry:8081/subjects/order-events-value/versions

# Получить схему по версии
curl http://schema-registry:8081/subjects/order-events-value/versions/1

# Получить схему по ID
curl http://schema-registry:8081/schemas/ids/1

# Проверить совместимость новой схемы перед регистрацией
curl -X POST http://schema-registry:8081/compatibility/subjects/order-events-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "..."}'
# Ответ: {"is_compatible": true}

Настройка режимов совместимости

# Глобальный режим
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Переопределение для конкретного subject
curl -X PUT http://schema-registry:8081/config/order-events-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

Режимы:

  • BACKWARD — новая схема читает данные, записанные старой. Можно добавлять поля с default, удалять поля без default.
  • FORWARD — старая схема читает данные, записанные новой. Обратное.
  • FULL — оба направления. Только добавление/удаление optional-полей.
  • NONE — без проверок. Только для разработки.

Java-продюсер с Avro

<!-- pom.xml -->
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.3</version>
</dependency>
// Генерация Java-классов из Avro-схемы (через avro-maven-plugin)
// или использование GenericRecord для динамической схемы

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", false); // В проде — запрещаем авторегистрацию
props.put("use.latest.version", true);

KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);

OrderEvent event = OrderEvent.newBuilder()
    .setEventId(UUID.randomUUID().toString())
    .setOrderId(12345L)
    .setUserId(67890L)
    .setStatus(OrderStatus.CREATED)
    .setCreatedAt(Instant.now().toEpochMilli())
    .build();

producer.send(new ProducerRecord<>("order-events", event.getOrderId().toString(), event));

Python-клиент с Avro

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer

schema_registry_conf = {'url': 'http://schema-registry:8081'}

value_schema = avro.load('schemas/order_event.avsc')

producer = AvroProducer(
    {
        'bootstrap.servers': 'kafka-1:9092',
        'schema.registry.url': 'http://schema-registry:8081',
        'acks': 'all',
    },
    default_value_schema=value_schema
)

producer.produce(
    topic='order-events',
    key=str(order_id),
    value={
        'event_id': str(uuid.uuid4()),
        'order_id': order_id,
        'user_id': user_id,
        'status': 'CREATED',
        'amount': 1999.99,
        'created_at': int(time.time() * 1000),
    }
)
producer.flush()

Мониторинг Schema Registry

# Количество зарегистрированных subjects
curl -s http://schema-registry:8081/subjects | jq '. | length'

# Список всех subjects
curl -s http://schema-registry:8081/subjects | jq '.[]'

# Метрики через JMX или встроенный endpoint
curl http://schema-registry:8081/metrics

Prometheus: Schema Registry экспортирует метрики в формате Prometheus на /metrics. Важные: kafka_schema_registry_master_slave_role (должен быть один master), kafka_schema_registry_registered_count.

CI/CD интеграция

В pipeline перед деплоем новой версии сервиса — проверяем совместимость схемы:

#!/bin/bash
# schema-compatibility-check.sh

SCHEMA_FILE="src/main/avro/OrderEvent.avsc"
SUBJECT="order-events-value"
REGISTRY_URL="http://schema-registry:8081"

SCHEMA_JSON=$(jq -c . "$SCHEMA_FILE")
RESPONSE=$(curl -s -X POST \
    "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
    -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    -d "{\"schema\": $(echo $SCHEMA_JSON | jq -R .)}")

COMPATIBLE=$(echo $RESPONSE | jq -r '.is_compatible')

if [ "$COMPATIBLE" != "true" ]; then
    echo "FAIL: Schema is not compatible: $RESPONSE"
    exit 1
fi

echo "OK: Schema is backward compatible"

Таймлайн

День 1 — установка Schema Registry, определение Avro-схем для всех топиков, настройка compatibility mode.

День 2 — интеграция продюсеров и консьюмеров с KafkaAvroSerializer/Deserializer, тестирование wire format.

День 3 — интеграция проверки совместимости в CI/CD, документирование процесса эволюции схем для команды. Тест: намеренно сломанное несовместимое изменение должно падать в пайплайне до деплоя.