Реализация Event-Driven Architecture для веб-приложения
Event-Driven Architecture (EDA) — архитектурный стиль, в котором компоненты системы взаимодействуют через публикацию и потребление событий. Продюсер публикует событие в брокер и не знает, кто его обработает. Консьюмеры подписываются на интересующие их события независимо. Это радикально снижает связность (coupling) между компонентами.
Когда EDA уместна
- Нужно уведомить несколько систем об одном событии (новый заказ → инвентаризация + уведомление + аналитика)
- Обработка занимает время и блокировать HTTP-запрос нецелесообразно
- Пиковая нагрузка, которую нужно сгладить через очередь
- Аудит и история изменений
- Интеграция с внешними системами через webhooks или CDC
Структура события
interface DomainEvent<T = unknown> {
id: string; // UUID — для идемпотентности
type: string; // 'user.registered', 'order.placed'
version: string; // '1.0' — для schema evolution
source: string; // 'order-service'
correlationId: string; // сквозной ID через все сервисы
causationId?: string; // ID события, ставшего причиной
occurredAt: string; // ISO 8601
data: T;
}
// Конкретное событие
interface OrderPlacedEvent extends DomainEvent<{
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
shippingAddress: Address;
}> {
type: 'order.placed';
}
Apache Kafka — основной брокер для EDA
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: process.env.KAFKA_BROKERS.split(',')
});
// Продюсер
const producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner
});
async function publishOrderPlaced(order: Order): Promise<void> {
await producer.send({
topic: 'order.events',
messages: [{
key: order.id, // партиционирование по ID заказа
value: JSON.stringify({
id: uuidv4(),
type: 'order.placed',
version: '1.0',
source: 'order-service',
correlationId: context.correlationId,
occurredAt: new Date().toISOString(),
data: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
total: order.total
}
} satisfies OrderPlacedEvent),
headers: {
'content-type': 'application/json',
'schema-version': '1.0'
}
}]
});
}
// Консьюмер — Inventory Service
const consumer = kafka.consumer({ groupId: 'inventory-service' });
await consumer.subscribe({ topics: ['order.events'], fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString()) as DomainEvent;
// Идемпотентность: проверяем, не обрабатывали ли уже это событие
const processed = await idempotencyRepo.exists(event.id);
if (processed) return;
try {
if (event.type === 'order.placed') {
await inventoryService.reserveStock(event.data.orderId, event.data.items);
}
await idempotencyRepo.mark(event.id);
} catch (error) {
// Публикуем в Dead Letter Topic для анализа
await deadLetterProducer.send({
topic: 'order.events.dlq',
messages: [{
value: message.value,
headers: { 'failure-reason': error.message }
}]
});
}
}
});
Outbox Pattern — гарантированная доставка
Ошибка: сохранить в БД и потом опубликовать в Kafka — риск потери события при сбое между шагами. Правильно — Transactional Outbox:
// В рамках одной транзакции БД
async function createOrder(dto: CreateOrderDto): Promise<Order> {
return db.transaction(async (trx) => {
// 1. Сохраняем заказ
const order = await trx('orders').insert({ ...orderData }).returning('*');
// 2. Сохраняем событие в outbox-таблицу (в той же транзакции!)
await trx('outbox_events').insert({
id: uuidv4(),
aggregate_id: order.id,
event_type: 'order.placed',
payload: JSON.stringify(orderPlacedEvent),
status: 'pending',
created_at: new Date()
});
return order;
});
}
Отдельный Outbox Poller читает pending-события и публикует их в Kafka:
// Cron job или background worker
async function processOutbox(): Promise<void> {
const events = await db('outbox_events')
.where({ status: 'pending' })
.orderBy('created_at')
.limit(100)
.forUpdate()
.skipLocked();
for (const event of events) {
try {
await kafka.producer.send({
topic: getTopicForEventType(event.event_type),
messages: [{ key: event.aggregate_id, value: event.payload }]
});
await db('outbox_events')
.where({ id: event.id })
.update({ status: 'published', published_at: new Date() });
} catch {
await db('outbox_events')
.where({ id: event.id })
.update({ retry_count: db.raw('retry_count + 1') });
}
}
}
Альтернатива — Debezium CDC: читает WAL PostgreSQL и публикует изменения в Kafka без дополнительного кода.
Choreography vs Orchestration
| Choreography | Orchestration | |
|---|---|---|
| Координация | Сервисы реагируют на события | Центральный оркестратор |
| Coupling | Низкий | Средний |
| Видимость потока | Сложно отследить | Явно в коде оркестратора |
| Тестирование | Сложнее | Проще |
Event Sourcing как частный случай EDA
При Event Sourcing все изменения состояния — события, которые публикуются и в event store, и в брокер. Проекции строятся на основе этих же событий. EDA и ES хорошо работают вместе.
Сроки реализации
- EDA для одного сценария (один продюсер + 2–3 консьюмера) — 1–2 недели
- Outbox Pattern + идемпотентность + DLQ — ещё 1 неделя
- Полная EDA для 5–10 сервисов с мониторингом — 4–8 недель







