Реализация паттерна Saga через очереди сообщений
Распределённые транзакции через двухфазный commit (2PC) не работают в микросервисной архитектуре — они создают синхронные зависимости и точки отказа. Паттерн Saga решает эту проблему: длинная транзакция разбивается на последовательность локальных транзакций, каждая из которых публикует событие для следующего шага. При ошибке выполняются компенсирующие транзакции в обратном порядке.
Два подхода к оркестрации Saga
Choreography (хореография) — каждый сервис реагирует на события и публикует свои. Нет центрального координатора. Хорошо для простых сценариев с 2–3 шагами. Сложно отлаживать при увеличении числа участников.
Orchestration (оркестрация) — выделенный Saga Orchestrator знает весь сценарий, отправляет команды каждому сервису и ждёт ответа. Проще отлаживать и мониторить. Рекомендуется для сложных сценариев.
Пример: Saga оформления заказа
Сценарий: CreateOrder → ReserveInventory → ProcessPayment → ShipOrder
При ошибке на любом шаге — компенсация в обратном порядке.
CreateOrder
↓ success
ReserveInventory
↓ success
ProcessPayment
↓ failure → CancelPayment
↓
ReleaseInventory
↓
CancelOrder
Реализация Orchestration Saga (Java/Spring)
// Состояние Saga хранится в БД — гарантирует восстановление после рестарта
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
@Id
private String sagaId;
private Long orderId;
@Enumerated(EnumType.STRING)
private SagaStatus status; // STARTED, INVENTORY_RESERVED, PAYMENT_PROCESSING, COMPLETED, COMPENSATING, FAILED
@Enumerated(EnumType.STRING)
private SagaStep currentStep;
private String failureReason;
private int retryCount;
@Column(columnDefinition = "jsonb")
private String context; // JSON с данными для компенсации
}
@Service
@Transactional
public class OrderSagaOrchestrator {
@Autowired
private OrderSagaRepository sagaRepo;
@Autowired
private MessagePublisher publisher;
public void startSaga(CreateOrderCommand command) {
// Локальная транзакция: создаём заказ + сохраняем Saga
Order order = orderService.createDraft(command);
OrderSaga saga = new OrderSaga();
saga.setSagaId(UUID.randomUUID().toString());
saga.setOrderId(order.getId());
saga.setStatus(SagaStatus.STARTED);
saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
sagaRepo.save(saga);
// Публикуем команду для следующего шага
publisher.publish("inventory-commands", new ReserveInventoryCommand(
saga.getSagaId(),
order.getId(),
command.getItems()
));
}
@KafkaListener(topics = "inventory-events")
public void onInventoryEvent(InventoryEvent event) {
OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId())
.orElseThrow(() -> new IllegalStateException("Saga not found: " + event.getSagaId()));
if (event.getType() == EventType.INVENTORY_RESERVED) {
saga.setStatus(SagaStatus.INVENTORY_RESERVED);
saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
sagaRepo.save(saga);
publisher.publish("payment-commands", new ProcessPaymentCommand(
saga.getSagaId(),
saga.getOrderId(),
event.getReservationId()
));
} else if (event.getType() == EventType.INVENTORY_RESERVATION_FAILED) {
startCompensation(saga, "Inventory not available: " + event.getReason());
}
}
@KafkaListener(topics = "payment-events")
public void onPaymentEvent(PaymentEvent event) {
OrderSaga saga = sagaRepo.findBySagaId(event.getSagaId()).orElseThrow();
if (event.getType() == EventType.PAYMENT_COMPLETED) {
saga.setStatus(SagaStatus.COMPLETED);
saga.setCurrentStep(null);
sagaRepo.save(saga);
orderService.confirmOrder(saga.getOrderId());
publisher.publish("shipping-commands", new CreateShipmentCommand(
saga.getSagaId(), saga.getOrderId()
));
} else if (event.getType() == EventType.PAYMENT_FAILED) {
startCompensation(saga, "Payment failed: " + event.getErrorCode());
}
}
private void startCompensation(OrderSaga saga, String reason) {
saga.setStatus(SagaStatus.COMPENSATING);
saga.setFailureReason(reason);
sagaRepo.save(saga);
// Определяем, какие шаги нужно компенсировать на основе currentStep
switch (saga.getCurrentStep()) {
case PROCESS_PAYMENT:
// Инвентарь зарезервирован, платёж упал
publisher.publish("inventory-commands", new ReleaseInventoryCommand(
saga.getSagaId(), saga.getOrderId()
));
break;
case RESERVE_INVENTORY:
// Не успели зарезервировать — только отменяем заказ
orderService.cancelOrder(saga.getOrderId(), reason);
saga.setStatus(SagaStatus.FAILED);
sagaRepo.save(saga);
break;
}
}
}
Idempotency — защита от дублей
Каждый шаг Saga должен быть идемпотентным: повторная команда не должна создавать дубль транзакции.
@Service
public class InventoryService {
public void reserveInventory(ReserveInventoryCommand command) {
// Проверяем: может, уже резервировали для этой Saga?
Optional<InventoryReservation> existing =
reservationRepo.findBySagaId(command.getSagaId());
if (existing.isPresent()) {
// Идемпотентный ответ — публикуем то же событие
publisher.publish("inventory-events", new InventoryReservedEvent(
command.getSagaId(),
existing.get().getId()
));
return;
}
// Выполняем резервирование
try {
InventoryReservation reservation = performReservation(command);
publisher.publish("inventory-events", new InventoryReservedEvent(
command.getSagaId(), reservation.getId()
));
} catch (InsufficientInventoryException e) {
publisher.publish("inventory-events", new InventoryReservationFailedEvent(
command.getSagaId(), e.getMessage()
));
}
}
}
Реализация через RabbitMQ
Альтернатива Kafka — та же логика через RabbitMQ с topic exchange:
# Python-оркестратор
import pika
import json
import uuid
from enum import Enum
class SagaOrchestrator:
def __init__(self, connection):
self.channel = connection.channel()
self.channel.exchange_declare('saga-commands', 'topic', durable=True)
self.channel.exchange_declare('saga-events', 'topic', durable=True)
# Очередь для получения ответов от сервисов
result = self.channel.queue_declare('orchestrator-responses', durable=True)
self.channel.queue_bind(result.method.queue, 'saga-events', '#')
self.channel.basic_consume(
result.method.queue,
self.on_event,
auto_ack=False
)
def start_order_saga(self, order_data: dict) -> str:
saga_id = str(uuid.uuid4())
# Сохраняем Saga в БД (psycopg2/SQLAlchemy)
self.save_saga(saga_id, order_data['order_id'], 'STARTED', 'RESERVE_INVENTORY')
# Отправляем первую команду
self.channel.basic_publish(
exchange='saga-commands',
routing_key='inventory.reserve',
body=json.dumps({
'saga_id': saga_id,
'order_id': order_data['order_id'],
'items': order_data['items'],
}),
properties=pika.BasicProperties(
delivery_mode=2,
message_id=str(uuid.uuid4()),
correlation_id=saga_id,
)
)
return saga_id
def on_event(self, channel, method, properties, body):
event = json.loads(body)
saga = self.load_saga(event['saga_id'])
if event['type'] == 'INVENTORY_RESERVED':
self.proceed_to_payment(saga, event)
elif event['type'] in ('INVENTORY_FAILED', 'PAYMENT_FAILED'):
self.compensate(saga, event['reason'])
channel.basic_ack(method.delivery_tag)
Мониторинг и отладка Saga
Без видимости в состояния Saga отладка распределённых транзакций крайне сложна.
-- Зависшие Saga — не завершились за 30 минут
SELECT saga_id, order_id, status, current_step,
created_at, NOW() - created_at AS age
FROM order_sagas
WHERE status NOT IN ('COMPLETED', 'FAILED')
AND created_at < NOW() - INTERVAL '30 minutes'
ORDER BY created_at;
-- Статистика за день
SELECT status, COUNT(*), AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_duration_sec
FROM order_sagas
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY status;
Alert на зависшие Saga:
- alert: StuckSagas
expr: sum(order_sagas_stuck_count) > 0
for: 10m
annotations:
summary: "{{ $value }} order sagas stuck for more than 30 minutes"
Таймлайн
День 1–2 — проектирование Saga: определение шагов, компенсирующих действий, формата команд и событий. Схема БД для состояния Saga.
День 3–4 — разработка Orchestrator: обработчики событий, логика компенсации, идемпотентность команд.
День 5 — интеграция со всеми участвующими сервисами, реализация компенсирующих методов в каждом сервисе.
День 6 — тестирование сценариев отказа: убиваем каждый сервис на каждом шаге, проверяем корректность компенсации.
День 7 — мониторинг состояний Saga, алерты на зависшие транзакции, инструмент для ручного управления (resume/compensate/retry).







