Реализация Event Sourcing через брокер сообщений

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Реализация Event Sourcing через брокер сообщений
Сложная
~1-2 недели
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Реализация Event Sourcing через брокер сообщений

Event Sourcing — это хранение состояния системы не как текущего снимка, а как последовательности событий. Вместо UPDATE orders SET status='shipped' в БД добавляется запись OrderShipped{orderId: 42, timestamp: ..., trackingCode: ...}. Текущее состояние вычисляется воспроизведением всех событий.

Брокер сообщений (Kafka, EventStoreDB) служит Event Log — неизменяемым журналом, из которого можно восстановить любое прошлое состояние.

Когда Event Sourcing оправдан

Event Sourcing добавляет сложности. Оправдан, когда:

  • Нужен полный аудит-лог (финтех, медицина, e-commerce)
  • Нужна возможность воспроизвести прошлое состояние на конкретный момент
  • Несколько read-моделей из одного источника (CQRS)
  • Undo/redo операции

Не оправдан для большинства CRUD-приложений без строгих требований к аудиту.

Event Store на базе Kafka

Kafka идеально подходит как Event Store: топики — это append-only логи, события упорядочены в пределах партиции, retention настраивается вплоть до log.retention.ms=-1 (навсегда).

Схема топиков:

  • orders-events — все события заказов (ключ = order_id, гарантирует порядок в партиции)
  • users-events — события пользователей
  • inventory-events — движение товарных остатков
// Базовый класс доменного события
public abstract class DomainEvent {
    private final String eventId;
    private final String aggregateId;
    private final String aggregateType;
    private final long version;          // монотонно растущий номер версии агрегата
    private final Instant occurredAt;
    private final String causedBy;       // ID команды, вызвавшей событие

    // события неизменяемы
}

// Конкретные события
public class OrderCreated extends DomainEvent {
    private final Long userId;
    private final List<OrderItem> items;
    private final BigDecimal totalAmount;
    private final String currency;
}

public class OrderPaid extends DomainEvent {
    private final String paymentId;
    private final String paymentMethod;
    private final BigDecimal amount;
}

public class OrderShipped extends DomainEvent {
    private final String carrier;
    private final String trackingCode;
    private final Instant estimatedDelivery;
}

public class OrderCancelled extends DomainEvent {
    private final String reason;
    private final String cancelledBy; // "customer" | "system" | "support"
}

Агрегат с event sourcing

public class Order {
    private Long id;
    private Long userId;
    private OrderStatus status;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private long version = 0;

    private final List<DomainEvent> pendingEvents = new ArrayList<>();

    // Статический метод восстановления из событий
    public static Order reconstitute(List<DomainEvent> events) {
        Order order = new Order();
        for (DomainEvent event : events) {
            order.apply(event);
        }
        return order;
    }

    // Команда: создать заказ
    public void create(Long userId, List<OrderItem> items) {
        if (this.status != null) throw new IllegalStateException("Order already exists");

        BigDecimal total = items.stream()
            .map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);

        OrderCreated event = new OrderCreated(
            UUID.randomUUID().toString(),
            String.valueOf(id),
            "Order",
            version + 1,
            Instant.now(),
            userId,
            items,
            total,
            "USD"
        );

        apply(event);
        pendingEvents.add(event);
    }

    public void pay(String paymentId, String method, BigDecimal amount) {
        if (status != OrderStatus.CREATED) {
            throw new InvalidOrderStateException("Cannot pay order in status: " + status);
        }

        OrderPaid event = new OrderPaid(
            UUID.randomUUID().toString(),
            String.valueOf(id),
            "Order",
            version + 1,
            Instant.now(),
            paymentId,
            method,
            amount
        );

        apply(event);
        pendingEvents.add(event);
    }

    // apply — мутирует состояние без побочных эффектов
    private void apply(DomainEvent event) {
        version = event.getVersion();

        if (event instanceof OrderCreated e) {
            this.userId = e.getUserId();
            this.items = e.getItems();
            this.totalAmount = e.getTotalAmount();
            this.status = OrderStatus.CREATED;
        } else if (event instanceof OrderPaid) {
            this.status = OrderStatus.PAID;
        } else if (event instanceof OrderShipped e) {
            this.status = OrderStatus.SHIPPED;
        } else if (event instanceof OrderCancelled) {
            this.status = OrderStatus.CANCELLED;
        }
    }

    public List<DomainEvent> pullPendingEvents() {
        List<DomainEvent> events = new ArrayList<>(pendingEvents);
        pendingEvents.clear();
        return events;
    }
}

Event Store Repository

@Repository
public class OrderEventStoreRepository {
    private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
    private final KafkaConsumer<String, DomainEvent> replayConsumer;
    private static final String TOPIC = "order-events";

    public void save(Order order) {
        List<DomainEvent> events = order.pullPendingEvents();
        if (events.isEmpty()) return;

        // Оптимистичная блокировка — версия события в заголовке
        for (DomainEvent event : events) {
            Headers headers = new RecordHeaders();
            headers.add("aggregate-version", String.valueOf(event.getVersion()).getBytes());
            headers.add("event-type", event.getClass().getSimpleName().getBytes());

            ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
                TOPIC,
                null,
                order.getId().toString(),
                event,
                headers
            );

            // Синхронная отправка для гарантии записи
            kafkaTemplate.send(record).get(5, TimeUnit.SECONDS);
        }
    }

    public Order load(Long orderId) {
        // Воспроизводим все события для агрегата
        List<DomainEvent> events = replayEvents(TOPIC, orderId.toString());
        if (events.isEmpty()) throw new OrderNotFoundException(orderId);
        return Order.reconstitute(events);
    }

    private List<DomainEvent> replayEvents(String topic, String aggregateId) {
        // Читаем все партиции и фильтруем по ключу агрегата
        // В реальном продукте: используем отдельный топик per-aggregate
        // или EventStoreDB вместо Kafka для лучшей поддержки чтения по aggregate ID
        List<DomainEvent> events = new ArrayList<>();
        // ... реализация чтения по ключу
        return events;
    }
}

Проекции и Read Models

Из Event Stream строятся Read Models — денормализованные представления для конкретных запросов:

@Component
public class OrderReadModelProjection {

    @Autowired
    private OrderReadModelRepository readRepo;

    @KafkaListener(topics = "order-events", groupId = "order-read-model-projector")
    public void project(ConsumerRecord<String, DomainEvent> record) {
        DomainEvent event = record.value();

        switch (event) {
            case OrderCreated e -> {
                OrderReadModel model = new OrderReadModel();
                model.setOrderId(Long.parseLong(e.getAggregateId()));
                model.setUserId(e.getUserId());
                model.setStatus("CREATED");
                model.setTotalAmount(e.getTotalAmount());
                model.setItemCount(e.getItems().size());
                model.setCreatedAt(e.getOccurredAt());
                readRepo.save(model);
            }
            case OrderPaid e -> readRepo.updateStatus(
                Long.parseLong(e.getAggregateId()), "PAID", e.getOccurredAt()
            );
            case OrderShipped e -> readRepo.updateStatusWithTracking(
                Long.parseLong(e.getAggregateId()), "SHIPPED",
                e.getTrackingCode(), e.getEstimatedDelivery()
            );
            case OrderCancelled e -> readRepo.updateStatus(
                Long.parseLong(e.getAggregateId()), "CANCELLED", e.getOccurredAt()
            );
            default -> {}
        }
    }
}

Snapshots — оптимизация воспроизведения

При тысячах событий на агрегат воспроизведение с начала занимает время. Снапшоты сохраняют состояние на определённой версии:

@Service
public class SnapshotService {

    public void createSnapshotIfNeeded(Order order) {
        if (order.getVersion() % 100 == 0) { // каждые 100 событий
            OrderSnapshot snapshot = new OrderSnapshot(
                order.getId(),
                order.getVersion(),
                objectMapper.writeValueAsString(order),
                Instant.now()
            );
            snapshotRepo.save(snapshot);
        }
    }

    public Order loadWithSnapshot(Long orderId) {
        Optional<OrderSnapshot> snapshot = snapshotRepo.findLatest(orderId);

        if (snapshot.isPresent()) {
            Order order = objectMapper.readValue(snapshot.get().getState(), Order.class);
            // Догружаем только события после версии снапшота
            List<DomainEvent> newEvents = eventRepo.loadAfterVersion(
                orderId, snapshot.get().getVersion()
            );
            for (DomainEvent event : newEvents) {
                order.applyHistorical(event);
            }
            return order;
        }

        return eventRepo.load(orderId);
    }
}

Таймлайн

День 1–2 — проектирование событийной модели: какие события, их поля, версионирование. Авро-схемы или Protobuf для каждого типа события.

День 3–4 — разработка агрегатов и Event Store Repository, реализация apply-методов, оптимистичная блокировка.

День 5–6 — реализация проекций для Read Models, тестирование воспроизведения.

День 7 — снапшоты, нагрузочное тестирование, оценка времени воспроизведения без снапшотов vs со снапшотами.

День 8 — мониторинг отставания проекций, инструмент для пересоздания Read Model из Event Log (full replay).