Реализация Event Sourcing через брокер сообщений
Event Sourcing — это хранение состояния системы не как текущего снимка, а как последовательности событий. Вместо UPDATE orders SET status='shipped' в БД добавляется запись OrderShipped{orderId: 42, timestamp: ..., trackingCode: ...}. Текущее состояние вычисляется воспроизведением всех событий.
Брокер сообщений (Kafka, EventStoreDB) служит Event Log — неизменяемым журналом, из которого можно восстановить любое прошлое состояние.
Когда Event Sourcing оправдан
Event Sourcing добавляет сложности. Оправдан, когда:
- Нужен полный аудит-лог (финтех, медицина, e-commerce)
- Нужна возможность воспроизвести прошлое состояние на конкретный момент
- Несколько read-моделей из одного источника (CQRS)
- Undo/redo операции
Не оправдан для большинства CRUD-приложений без строгих требований к аудиту.
Event Store на базе Kafka
Kafka идеально подходит как Event Store: топики — это append-only логи, события упорядочены в пределах партиции, retention настраивается вплоть до log.retention.ms=-1 (навсегда).
Схема топиков:
-
orders-events— все события заказов (ключ = order_id, гарантирует порядок в партиции) -
users-events— события пользователей -
inventory-events— движение товарных остатков
// Базовый класс доменного события
public abstract class DomainEvent {
private final String eventId;
private final String aggregateId;
private final String aggregateType;
private final long version; // монотонно растущий номер версии агрегата
private final Instant occurredAt;
private final String causedBy; // ID команды, вызвавшей событие
// события неизменяемы
}
// Конкретные события
public class OrderCreated extends DomainEvent {
private final Long userId;
private final List<OrderItem> items;
private final BigDecimal totalAmount;
private final String currency;
}
public class OrderPaid extends DomainEvent {
private final String paymentId;
private final String paymentMethod;
private final BigDecimal amount;
}
public class OrderShipped extends DomainEvent {
private final String carrier;
private final String trackingCode;
private final Instant estimatedDelivery;
}
public class OrderCancelled extends DomainEvent {
private final String reason;
private final String cancelledBy; // "customer" | "system" | "support"
}
Агрегат с event sourcing
public class Order {
private Long id;
private Long userId;
private OrderStatus status;
private List<OrderItem> items;
private BigDecimal totalAmount;
private long version = 0;
private final List<DomainEvent> pendingEvents = new ArrayList<>();
// Статический метод восстановления из событий
public static Order reconstitute(List<DomainEvent> events) {
Order order = new Order();
for (DomainEvent event : events) {
order.apply(event);
}
return order;
}
// Команда: создать заказ
public void create(Long userId, List<OrderItem> items) {
if (this.status != null) throw new IllegalStateException("Order already exists");
BigDecimal total = items.stream()
.map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
OrderCreated event = new OrderCreated(
UUID.randomUUID().toString(),
String.valueOf(id),
"Order",
version + 1,
Instant.now(),
userId,
items,
total,
"USD"
);
apply(event);
pendingEvents.add(event);
}
public void pay(String paymentId, String method, BigDecimal amount) {
if (status != OrderStatus.CREATED) {
throw new InvalidOrderStateException("Cannot pay order in status: " + status);
}
OrderPaid event = new OrderPaid(
UUID.randomUUID().toString(),
String.valueOf(id),
"Order",
version + 1,
Instant.now(),
paymentId,
method,
amount
);
apply(event);
pendingEvents.add(event);
}
// apply — мутирует состояние без побочных эффектов
private void apply(DomainEvent event) {
version = event.getVersion();
if (event instanceof OrderCreated e) {
this.userId = e.getUserId();
this.items = e.getItems();
this.totalAmount = e.getTotalAmount();
this.status = OrderStatus.CREATED;
} else if (event instanceof OrderPaid) {
this.status = OrderStatus.PAID;
} else if (event instanceof OrderShipped e) {
this.status = OrderStatus.SHIPPED;
} else if (event instanceof OrderCancelled) {
this.status = OrderStatus.CANCELLED;
}
}
public List<DomainEvent> pullPendingEvents() {
List<DomainEvent> events = new ArrayList<>(pendingEvents);
pendingEvents.clear();
return events;
}
}
Event Store Repository
@Repository
public class OrderEventStoreRepository {
private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
private final KafkaConsumer<String, DomainEvent> replayConsumer;
private static final String TOPIC = "order-events";
public void save(Order order) {
List<DomainEvent> events = order.pullPendingEvents();
if (events.isEmpty()) return;
// Оптимистичная блокировка — версия события в заголовке
for (DomainEvent event : events) {
Headers headers = new RecordHeaders();
headers.add("aggregate-version", String.valueOf(event.getVersion()).getBytes());
headers.add("event-type", event.getClass().getSimpleName().getBytes());
ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
TOPIC,
null,
order.getId().toString(),
event,
headers
);
// Синхронная отправка для гарантии записи
kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
}
}
public Order load(Long orderId) {
// Воспроизводим все события для агрегата
List<DomainEvent> events = replayEvents(TOPIC, orderId.toString());
if (events.isEmpty()) throw new OrderNotFoundException(orderId);
return Order.reconstitute(events);
}
private List<DomainEvent> replayEvents(String topic, String aggregateId) {
// Читаем все партиции и фильтруем по ключу агрегата
// В реальном продукте: используем отдельный топик per-aggregate
// или EventStoreDB вместо Kafka для лучшей поддержки чтения по aggregate ID
List<DomainEvent> events = new ArrayList<>();
// ... реализация чтения по ключу
return events;
}
}
Проекции и Read Models
Из Event Stream строятся Read Models — денормализованные представления для конкретных запросов:
@Component
public class OrderReadModelProjection {
@Autowired
private OrderReadModelRepository readRepo;
@KafkaListener(topics = "order-events", groupId = "order-read-model-projector")
public void project(ConsumerRecord<String, DomainEvent> record) {
DomainEvent event = record.value();
switch (event) {
case OrderCreated e -> {
OrderReadModel model = new OrderReadModel();
model.setOrderId(Long.parseLong(e.getAggregateId()));
model.setUserId(e.getUserId());
model.setStatus("CREATED");
model.setTotalAmount(e.getTotalAmount());
model.setItemCount(e.getItems().size());
model.setCreatedAt(e.getOccurredAt());
readRepo.save(model);
}
case OrderPaid e -> readRepo.updateStatus(
Long.parseLong(e.getAggregateId()), "PAID", e.getOccurredAt()
);
case OrderShipped e -> readRepo.updateStatusWithTracking(
Long.parseLong(e.getAggregateId()), "SHIPPED",
e.getTrackingCode(), e.getEstimatedDelivery()
);
case OrderCancelled e -> readRepo.updateStatus(
Long.parseLong(e.getAggregateId()), "CANCELLED", e.getOccurredAt()
);
default -> {}
}
}
}
Snapshots — оптимизация воспроизведения
При тысячах событий на агрегат воспроизведение с начала занимает время. Снапшоты сохраняют состояние на определённой версии:
@Service
public class SnapshotService {
public void createSnapshotIfNeeded(Order order) {
if (order.getVersion() % 100 == 0) { // каждые 100 событий
OrderSnapshot snapshot = new OrderSnapshot(
order.getId(),
order.getVersion(),
objectMapper.writeValueAsString(order),
Instant.now()
);
snapshotRepo.save(snapshot);
}
}
public Order loadWithSnapshot(Long orderId) {
Optional<OrderSnapshot> snapshot = snapshotRepo.findLatest(orderId);
if (snapshot.isPresent()) {
Order order = objectMapper.readValue(snapshot.get().getState(), Order.class);
// Догружаем только события после версии снапшота
List<DomainEvent> newEvents = eventRepo.loadAfterVersion(
orderId, snapshot.get().getVersion()
);
for (DomainEvent event : newEvents) {
order.applyHistorical(event);
}
return order;
}
return eventRepo.load(orderId);
}
}
Таймлайн
День 1–2 — проектирование событийной модели: какие события, их поля, версионирование. Авро-схемы или Protobuf для каждого типа события.
День 3–4 — разработка агрегатов и Event Store Repository, реализация apply-методов, оптимистичная блокировка.
День 5–6 — реализация проекций для Read Models, тестирование воспроизведения.
День 7 — снапшоты, нагрузочное тестирование, оценка времени воспроизведения без снапшотов vs со снапшотами.
День 8 — мониторинг отставания проекций, инструмент для пересоздания Read Model из Event Log (full replay).







