Настройка Kafka Schema Registry для валидации сообщений
Без Schema Registry Kafka-топики — это слепые байтовые потоки. Продюсер поменял формат JSON — консьюмер упал с NullPointerException. Schema Registry решает эту проблему: схема сообщения версионируется, эволюция контролируется, несовместимые изменения блокируются до публикации.
Используется совместно с форматами Avro, Protobuf или JSON Schema.
Архитектура
Schema Registry — отдельный HTTP-сервис, который хранит схемы в Kafka-топике _schemas. Продюсер при первой отправке регистрирует схему и получает schema_id (целое число). Вместо полной схемы в каждое сообщение вшивается только schema_id (4 байта) — это и есть wire format Confluent.
Producer → [magic byte 0x00][schema_id 4 bytes][serialized payload] → Kafka
Consumer → читает schema_id → запрашивает схему из Registry → десериализует
Установка Confluent Schema Registry
# Через Docker Compose (типичный сетап)
version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: "_schemas"
SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 3
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: "BACKWARD"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/subjects"]
interval: 10s
retries: 5
Для продуктива — минимум 2 экземпляра за балансировщиком, один является master.
Определение Avro-схемы
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.orders",
"doc": "Событие изменения заказа",
"fields": [
{
"name": "event_id",
"type": "string",
"doc": "UUID события"
},
{
"name": "order_id",
"type": "long"
},
{
"name": "user_id",
"type": "long"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["CREATED", "PAID", "SHIPPED", "DELIVERED", "CANCELLED"]
}
},
{
"name": "amount",
"type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}
},
{
"name": "created_at",
"type": {"type": "long", "logicalType": "timestamp-millis"}
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null,
"doc": "Опциональные метаданные — новое поле, backward-compatible"
}
]
}
Регистрация схемы через REST API
# Регистрируем схему для subject "order-events-value"
# Subject naming strategy: {topic}-value (дефолт) или кастомная
curl -X POST http://schema-registry:8081/subjects/order-events-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"OrderEvent\",\"namespace\":\"com.example.orders\",\"fields\":[{\"name\":\"event_id\",\"type\":\"string\"},{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"status\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}"
}'
# Ответ: {"id": 1}
# Получить все версии схемы
curl http://schema-registry:8081/subjects/order-events-value/versions
# Получить схему по версии
curl http://schema-registry:8081/subjects/order-events-value/versions/1
# Получить схему по ID
curl http://schema-registry:8081/schemas/ids/1
# Проверить совместимость новой схемы перед регистрацией
curl -X POST http://schema-registry:8081/compatibility/subjects/order-events-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "..."}'
# Ответ: {"is_compatible": true}
Настройка режимов совместимости
# Глобальный режим
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Переопределение для конкретного subject
curl -X PUT http://schema-registry:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
Режимы:
-
BACKWARD— новая схема читает данные, записанные старой. Можно добавлять поля с default, удалять поля без default. -
FORWARD— старая схема читает данные, записанные новой. Обратное. -
FULL— оба направления. Только добавление/удаление optional-полей. -
NONE— без проверок. Только для разработки.
Java-продюсер с Avro
<!-- pom.xml -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
// Генерация Java-классов из Avro-схемы (через avro-maven-plugin)
// или использование GenericRecord для динамической схемы
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", false); // В проде — запрещаем авторегистрацию
props.put("use.latest.version", true);
KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);
OrderEvent event = OrderEvent.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setOrderId(12345L)
.setUserId(67890L)
.setStatus(OrderStatus.CREATED)
.setCreatedAt(Instant.now().toEpochMilli())
.build();
producer.send(new ProducerRecord<>("order-events", event.getOrderId().toString(), event));
Python-клиент с Avro
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
value_schema = avro.load('schemas/order_event.avsc')
producer = AvroProducer(
{
'bootstrap.servers': 'kafka-1:9092',
'schema.registry.url': 'http://schema-registry:8081',
'acks': 'all',
},
default_value_schema=value_schema
)
producer.produce(
topic='order-events',
key=str(order_id),
value={
'event_id': str(uuid.uuid4()),
'order_id': order_id,
'user_id': user_id,
'status': 'CREATED',
'amount': 1999.99,
'created_at': int(time.time() * 1000),
}
)
producer.flush()
Мониторинг Schema Registry
# Количество зарегистрированных subjects
curl -s http://schema-registry:8081/subjects | jq '. | length'
# Список всех subjects
curl -s http://schema-registry:8081/subjects | jq '.[]'
# Метрики через JMX или встроенный endpoint
curl http://schema-registry:8081/metrics
Prometheus: Schema Registry экспортирует метрики в формате Prometheus на /metrics. Важные: kafka_schema_registry_master_slave_role (должен быть один master), kafka_schema_registry_registered_count.
CI/CD интеграция
В pipeline перед деплоем новой версии сервиса — проверяем совместимость схемы:
#!/bin/bash
# schema-compatibility-check.sh
SCHEMA_FILE="src/main/avro/OrderEvent.avsc"
SUBJECT="order-events-value"
REGISTRY_URL="http://schema-registry:8081"
SCHEMA_JSON=$(jq -c . "$SCHEMA_FILE")
RESPONSE=$(curl -s -X POST \
"${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $(echo $SCHEMA_JSON | jq -R .)}")
COMPATIBLE=$(echo $RESPONSE | jq -r '.is_compatible')
if [ "$COMPATIBLE" != "true" ]; then
echo "FAIL: Schema is not compatible: $RESPONSE"
exit 1
fi
echo "OK: Schema is backward compatible"
Таймлайн
День 1 — установка Schema Registry, определение Avro-схем для всех топиков, настройка compatibility mode.
День 2 — интеграция продюсеров и консьюмеров с KafkaAvroSerializer/Deserializer, тестирование wire format.
День 3 — интеграция проверки совместимости в CI/CD, документирование процесса эволюции схем для команды. Тест: намеренно сломанное несовместимое изменение должно падать в пайплайне до деплоя.







