Реализация паттерна Saga через очереди сообщений

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Реализация паттерна Saga через очереди сообщений
Сложная
~5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Реализация паттерна Saga через очереди сообщений

Распределённые транзакции через двухфазный commit (2PC) не работают в микросервисной архитектуре — они создают синхронные зависимости и точки отказа. Паттерн Saga решает эту проблему: длинная транзакция разбивается на последовательность локальных транзакций, каждая из которых публикует событие для следующего шага. При ошибке выполняются компенсирующие транзакции в обратном порядке.

Два подхода к оркестрации Saga

Choreography (хореография) — каждый сервис реагирует на события и публикует свои. Нет центрального координатора. Хорошо для простых сценариев с 2–3 шагами. Сложно отлаживать при увеличении числа участников.

Orchestration (оркестрация) — выделенный Saga Orchestrator знает весь сценарий, отправляет команды каждому сервису и ждёт ответа. Проще отлаживать и мониторить. Рекомендуется для сложных сценариев.

Пример: Saga оформления заказа

Сценарий: CreateOrder → ReserveInventory → ProcessPayment → ShipOrder

При ошибке на любом шаге — компенсация в обратном порядке.

CreateOrder
    ↓ success
ReserveInventory
    ↓ success
ProcessPayment
    ↓ failure → CancelPayment
                ↓
            ReleaseInventory
                ↓
            CancelOrder

Реализация Orchestration Saga (Java/Spring)

// Состояние Saga хранится в БД — гарантирует восстановление после рестарта
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
    @Id
    private String sagaId;
    private Long orderId;

    @Enumerated(EnumType.STRING)
    private SagaStatus status; // STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSING, COMPLETED, COMPENSATING, FAILED

    @Enumerated(EnumType.STRING)
    private SagaStep currentStep;

    private String failureReason;
    private int retryCount;

    @Column(columnDefinition = "jsonb")
    private String context; // JSON с данными для компенсации
}

@Service
@Transactional
public class OrderSagaOrchestrator {

    @Autowired
    private OrderSagaRepository sagaRepo;

    @Autowired
    private MessagePublisher publisher;

    public void startSaga(CreateOrderCommand command) {
        // Локальная транзакция: создаём заказ + сохраняем Saga
        Order order = orderService.createDraft(command);

        OrderSaga saga = new OrderSaga();
        saga.setSagaId(UUID.randomUUID().toString());
        saga.setOrderId(order.getId());
        saga.setStatus(SagaStatus.STARTED);
        saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
        sagaRepo.save(saga);

        // Публикуем команду для следующего шага
        publisher.publish("inventory-commands", new ReserveInventoryCommand(
            saga.getSagaId(),
            order.getId(),
            command.getItems()
        ));
    }

    @KafkaListener(topics = "inventory-events")
    public void onInventoryEvent(InventoryEvent event) {
        OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId())
            .orElseThrow(() -> new IllegalStateException("Saga not found: " + event.getSagaId()));

        if (event.getType() == EventType.INVENTORY_RESERVED) {
            saga.setStatus(SagaStatus.INVENTORY_RESERVED);
            saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
            sagaRepo.save(saga);

            publisher.publish("payment-commands", new ProcessPaymentCommand(
                saga.getSagaId(),
                saga.getOrderId(),
                event.getReservationId()
            ));
        } else if (event.getType() == EventType.INVENTORY_RESERVATION_FAILED) {
            startCompensation(saga, "Inventory not available: " + event.getReason());
        }
    }

    @KafkaListener(topics = "payment-events")
    public void onPaymentEvent(PaymentEvent event) {
        OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId()).orElseThrow();

        if (event.getType() == EventType.PAYMENT_COMPLETED) {
            saga.setStatus(SagaStatus.COMPLETED);
            saga.setCurrentStep(null);
            sagaRepo.save(saga);

            orderService.confirmOrder(saga.getOrderId());
            publisher.publish("shipping-commands", new CreateShipmentCommand(
                saga.getSagaId(), saga.getOrderId()
            ));
        } else if (event.getType() == EventType.PAYMENT_FAILED) {
            startCompensation(saga, "Payment failed: " + event.getErrorCode());
        }
    }

    private void startCompensation(OrderSaga saga, String reason) {
        saga.setStatus(SagaStatus.COMPENSATING);
        saga.setFailureReason(reason);
        sagaRepo.save(saga);

        // Определяем, какие шаги нужно компенсировать на основе currentStep
        switch (saga.getCurrentStep()) {
            case PROCESS_PAYMENT:
                // Инвентарь зарезервирован, платёж упал
                publisher.publish("inventory-commands", new ReleaseInventoryCommand(
                    saga.getSagaId(), saga.getOrderId()
                ));
                break;
            case RESERVE_INVENTORY:
                // Не успели зарезервировать — только отменяем заказ
                orderService.cancelOrder(saga.getOrderId(), reason);
                saga.setStatus(SagaStatus.FAILED);
                sagaRepo.save(saga);
                break;
        }
    }
}

Idempotency — защита от дублей

Каждый шаг Saga должен быть идемпотентным: повторная команда не должна создавать дубль транзакции.

@Service
public class InventoryService {

    public void reserveInventory(ReserveInventoryCommand command) {
        // Проверяем: может, уже резервировали для этой Saga?
        Optional<InventoryReservation> existing =
            reservationRepo.findBySagaId(command.getSagaId());

        if (existing.isPresent()) {
            // Идемпотентный ответ — публикуем то же событие
            publisher.publish("inventory-events", new InventoryReservedEvent(
                command.getSagaId(),
                existing.get().getId()
            ));
            return;
        }

        // Выполняем резервирование
        try {
            InventoryReservation reservation = performReservation(command);
            publisher.publish("inventory-events", new InventoryReservedEvent(
                command.getSagaId(), reservation.getId()
            ));
        } catch (InsufficientInventoryException e) {
            publisher.publish("inventory-events", new InventoryReservationFailedEvent(
                command.getSagaId(), e.getMessage()
            ));
        }
    }
}

Реализация через RabbitMQ

Альтернатива Kafka — та же логика через RabbitMQ с topic exchange:

# Python-оркестратор
import pika
import json
import uuid
from enum import Enum

class SagaOrchestrator:
    def __init__(self, connection):
        self.channel = connection.channel()
        self.channel.exchange_declare('saga-commands', 'topic', durable=True)
        self.channel.exchange_declare('saga-events', 'topic', durable=True)

        # Очередь для получения ответов от сервисов
        result = self.channel.queue_declare('orchestrator-responses', durable=True)
        self.channel.queue_bind(result.method.queue, 'saga-events', '#')
        self.channel.basic_consume(
            result.method.queue,
            self.on_event,
            auto_ack=False
        )

    def start_order_saga(self, order_data: dict) -> str:
        saga_id = str(uuid.uuid4())

        # Сохраняем Saga в БД (psycopg2/SQLAlchemy)
        self.save_saga(saga_id, order_data['order_id'], 'STARTED', 'RESERVE_INVENTORY')

        # Отправляем первую команду
        self.channel.basic_publish(
            exchange='saga-commands',
            routing_key='inventory.reserve',
            body=json.dumps({
                'saga_id': saga_id,
                'order_id': order_data['order_id'],
                'items': order_data['items'],
            }),
            properties=pika.BasicProperties(
                delivery_mode=2,
                message_id=str(uuid.uuid4()),
                correlation_id=saga_id,
            )
        )
        return saga_id

    def on_event(self, channel, method, properties, body):
        event = json.loads(body)
        saga = self.load_saga(event['saga_id'])

        if event['type'] == 'INVENTORY_RESERVED':
            self.proceed_to_payment(saga, event)
        elif event['type'] in ('INVENTORY_FAILED', 'PAYMENT_FAILED'):
            self.compensate(saga, event['reason'])

        channel.basic_ack(method.delivery_tag)

Мониторинг и отладка Saga

Без видимости в состояния Saga отладка распределённых транзакций крайне сложна.

-- Зависшие Saga — не завершились за 30 минут
SELECT saga_id, order_id, status, current_step,
       created_at, NOW() - created_at AS age
FROM order_sagas
WHERE status NOT IN ('COMPLETED', 'FAILED')
  AND created_at < NOW() - INTERVAL '30 minutes'
ORDER BY created_at;

-- Статистика за день
SELECT status, COUNT(*), AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_sec
FROM order_sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY status;

Alert на зависшие Saga:

- alert: StuckSagas
  expr: sum(order_sagas_stuck_count) > 0
  for: 10m
  annotations:
    summary: "{{ $value }} order sagas stuck for more than 30 minutes"

Таймлайн

День 1–2 — проектирование Saga: определение шагов, компенсирующих действий, формата команд и событий. Схема БД для состояния Saga.

День 3–4 — разработка Orchestrator: обработчики событий, логика компенсации, идемпотентность команд.

День 5 — интеграция со всеми участвующими сервисами, реализация компенсирующих методов в каждом сервисе.

День 6 — тестирование сценариев отказа: убиваем каждый сервис на каждом шаге, проверяем корректность компенсации.

День 7 — мониторинг состояний Saga, алерты на зависшие транзакции, инструмент для ручного управления (resume/compensate/retry).