Разработка схемы событий (Event Schema) для микросервисов
Контракт между микросервисами через очереди — это Event Schema. Нет строгой схемы — есть хаос: продюсер переименовал поле, консьюмер упал в 3:00 ночи. Правильно спроектированная схема событий с версионированием делает изменения явными и контролируемыми.
Принципы проектирования Event Schema
События описывают факты, не команды. OrderShipped — это факт. ShipOrder — это команда. Событие произошло и не может быть отменено (только компенсировано другим событием).
Схема должна быть самодостаточной. Консьюмер не должен делать дополнительных запросов для обработки события. Все нужные данные — в теле события.
Обратная совместимость по умолчанию. Старые консьюмеры должны работать с новыми событиями без изменений.
Структура события
{
"eventId": "01HQ2XK4VB8M9QXYZ123456789",
"eventType": "order.shipped",
"eventVersion": "1.2",
"occurredAt": "2026-03-28T14:22:00.000Z",
"producedBy": "order-service",
"correlationId": "req-abc-123",
"causationId": "cmd-xyz-456",
"aggregateType": "Order",
"aggregateId": "12345",
"aggregateVersion": 7,
"payload": {
"orderId": 12345,
"userId": 67890,
"carrier": "DHL",
"trackingCode": "JD123456789DE",
"estimatedDelivery": "2026-03-31",
"items": [
{"sku": "PROD-001", "quantity": 2, "warehouseId": "WH-MSK"}
]
}
}
Обязательные поля конверта:
-
eventId— ULID или UUID, уникальный идентификатор для идемпотентности -
eventType— иерархический,domain.aggregate.action -
eventVersion— semantic versioning схемы payload -
occurredAt— UTC ISO 8601 -
correlationId— для трассировки цепочки запросов -
aggregateId+aggregateVersion— для оптимистичной блокировки
Avro-схема с эволюцией
{
"type": "record",
"name": "OrderShipped",
"namespace": "com.example.orders.events",
"doc": "Событие отгрузки заказа со склада",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "eventType", "type": "string", "default": "order.shipped"},
{"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "orderId", "type": "long"},
{"name": "userId", "type": "long"},
{"name": "carrier", "type": "string"},
{"name": "trackingCode", "type": "string"},
{
"name": "estimatedDelivery",
"type": ["null", "string"],
"default": null,
"doc": "ISO date, может отсутствовать для некоторых перевозчиков"
},
{
"name": "warehouseId",
"type": ["null", "string"],
"default": null,
"doc": "Добавлено в v1.1 — необязательное поле для backward compatibility"
},
{
"name": "shippingCost",
"type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}],
"default": null,
"doc": "Добавлено в v1.2"
}
]
}
Правила эволюции для backward compatibility:
- Новые поля — всегда с
default(null или значение) - Нельзя удалять обязательные поля
- Нельзя менять тип поля
- Нельзя переименовывать поля (добавьте alias, потом через мажорную версию переименуйте)
Версионирование и стратегии совместимости
# Настройка Schema Registry — BACKWARD совместимость для всех событий orders
curl -X PUT http://schema-registry:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD_TRANSITIVE"}'
# BACKWARD_TRANSITIVE — новая схема совместима со ВСЕМИ предыдущими версиями,
# не только с последней
Мажорное изменение (breaking change) — новый топик:
order-events-v1 → для консьюмеров на старой схеме
order-events-v2 → новая схема, консьюмеры мигрируют постепенно
Переходный период: продюсер публикует в оба топика. После полной миграции — order-events-v1 deprecated.
Event Catalog — документирование схем
Для команды из нескольких сервисов критично иметь центральный реестр событий. Используем AsyncAPI:
# asyncapi.yaml
asyncapi: 3.0.0
info:
title: Order Service Events
version: 1.0.0
description: События, публикуемые Order Service
channels:
order-events:
address: order-events
messages:
OrderCreated:
$ref: '#/components/messages/OrderCreated'
OrderShipped:
$ref: '#/components/messages/OrderShipped'
OrderCancelled:
$ref: '#/components/messages/OrderCancelled'
components:
messages:
OrderCreated:
name: OrderCreated
title: Заказ создан
summary: Публикуется при успешном создании нового заказа
contentType: application/avro
headers:
type: object
properties:
correlationId:
type: string
description: ID входящего HTTP-запроса
payload:
type: object
required: [eventId, orderId, userId, items, totalAmount]
properties:
eventId:
type: string
format: ulid
orderId:
type: integer
format: int64
userId:
type: integer
format: int64
items:
type: array
items:
type: object
properties:
sku:
type: string
quantity:
type: integer
price:
type: number
totalAmount:
type: number
createdAt:
type: string
format: date-time
Типизированный Event Publisher (TypeScript/Node.js)
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { Kafka } from 'kafkajs';
interface EventEnvelope<T> {
eventId: string;
eventType: string;
eventVersion: string;
occurredAt: string;
producedBy: string;
correlationId?: string;
aggregateType: string;
aggregateId: string;
aggregateVersion: number;
payload: T;
}
interface OrderShippedPayload {
orderId: number;
userId: number;
carrier: string;
trackingCode: string;
estimatedDelivery?: string;
}
class OrderEventPublisher {
private registry: SchemaRegistry;
private producer: ReturnType<Kafka['producer']>;
async publishOrderShipped(data: OrderShippedPayload, correlationId?: string): Promise<void> {
const envelope: EventEnvelope<OrderShippedPayload> = {
eventId: ulid(),
eventType: 'order.shipped',
eventVersion: '1.2',
occurredAt: new Date().toISOString(),
producedBy: 'order-service',
correlationId,
aggregateType: 'Order',
aggregateId: String(data.orderId),
aggregateVersion: await this.getAggregateVersion(data.orderId),
payload: data,
};
const schemaId = await this.registry.getLatestSchemaId('order-events-value');
const encoded = await this.registry.encode(schemaId, envelope);
await this.producer.send({
topic: 'order-events',
messages: [{
key: String(data.orderId),
value: encoded,
headers: {
'correlation-id': correlationId ?? '',
'event-type': 'order.shipped',
},
}],
});
}
}
Тестирование Event Schema
// Contract testing — проверяем, что продюсер публикует то, что консьюмер ожидает
@SpringBootTest
class OrderEventContractTest {
@Test
void orderShippedEvent_shouldMatchConsumerExpectations() throws Exception {
OrderShipped event = OrderShipped.newBuilder()
.setEventId(UUID.randomUUID().toString())
.setOrderId(12345L)
.setUserId(67890L)
.setCarrier("DHL")
.setTrackingCode("JD123456789DE")
.build();
// Сериализуем с Avro
byte[] serialized = avroSerializer.serialize("order-events", event);
// Десериализуем как консьюмер (другой сервис)
OrderShipped deserialized = (OrderShipped) avroDeserializer.deserialize("order-events", serialized);
assertThat(deserialized.getOrderId()).isEqualTo(12345L);
assertThat(deserialized.getTrackingCode()).isEqualTo("JD123456789DE");
// Проверяем обратную совместимость: старый консьюмер без поля warehouseId
OldOrderShipped oldDeserialized = (OldOrderShipped) oldDeserializer.deserialize("order-events", serialized);
assertThat(oldDeserialized.getOrderId()).isEqualTo(12345L);
// warehouseId отсутствует — не падаем
}
}
Таймлайн
День 1 — воркшоп с командами сервисов: составляем Event Storming карту, определяем все доменные события и их границы.
День 2 — разработка Avro-схем для каждого типа события, определение правил именования и структуры конверта. Регистрация в Schema Registry.
День 3 — реализация типизированных Event Publisher'ов в каждом сервисе-продюсере, AsyncAPI-документация.
День 4 — contract тесты, интеграция проверки совместимости в CI/CD pipeline, инструкция для команды по правилам эволюции схем.







