Реализация Saga Pattern для распределённых транзакций
В микросервисной архитектуре нет возможности использовать ACID-транзакции через границы сервисов. Saga Pattern решает задачу распределённой согласованности: бизнес-транзакция разбивается на последовательность локальных транзакций в каждом сервисе, а при сбое выполняются компенсирующие транзакции для отката.
Два вида Saga
Choreography (хореография) — сервисы реагируют на события друг друга без центрального координатора:
OrderService InventoryService PaymentService ShippingService
│ │ │ │
│── OrderCreated ───►│ │ │
│ │── StockReserved ──►│ │
│ │ │── PaymentProcessed►│
│ │ │ │── ShipmentCreated
│ │ │ │
│ (при ошибке оплаты) │ │
│ │◄── StockReleased ──│ │
Orchestration (оркестрация) — центральный Saga Orchestrator явно управляет шагами:
class CreateOrderSaga {
async execute(context: OrderSagaContext): Promise<void> {
try {
// Шаг 1: Зарезервировать товар
const reservation = await this.inventoryService.reserveStock(
context.orderId, context.items
);
context.reservationId = reservation.id;
// Шаг 2: Списать оплату
const payment = await this.paymentService.charge(
context.customerId, context.totalAmount
);
context.paymentId = payment.id;
// Шаг 3: Создать отгрузку
await this.shippingService.createShipment(
context.orderId, context.shippingAddress
);
// Шаг 4: Подтвердить заказ
await this.orderService.confirmOrder(context.orderId);
} catch (error) {
await this.compensate(context, error);
throw new SagaFailedError(context.orderId, error);
}
}
async compensate(context: OrderSagaContext, failedAt: Error): Promise<void> {
// Компенсации выполняются в обратном порядке
if (context.paymentId) {
await this.paymentService.refund(context.paymentId)
.catch(e => this.logger.error('Refund failed', e));
}
if (context.reservationId) {
await this.inventoryService.releaseReservation(context.reservationId)
.catch(e => this.logger.error('Release failed', e));
}
await this.orderService.cancelOrder(context.orderId, 'Saga compensation');
}
}
Персистентная Saga с состоянием
Сага должна переживать рестарты сервиса. Состояние хранится в БД:
interface SagaState {
sagaId: string;
sagaType: string;
status: 'running' | 'completed' | 'failed' | 'compensating';
currentStep: number;
context: Record<string, unknown>;
completedSteps: string[];
failedStep?: string;
createdAt: Date;
updatedAt: Date;
}
class PersistentSagaOrchestrator {
async startSaga(sagaType: string, context: unknown): Promise<string> {
const sagaId = uuidv4();
await this.sagaRepo.save({
sagaId, sagaType, status: 'running',
currentStep: 0, context, completedSteps: []
});
await this.executeSaga(sagaId);
return sagaId;
}
async resumeSaga(sagaId: string): Promise<void> {
const state = await this.sagaRepo.findById(sagaId);
if (!state || state.status !== 'running') return;
// Возобновляем с незавершённого шага
await this.executeSaga(sagaId, state.currentStep);
}
}
Temporal.io для оркестрации саг
Temporal — production-ready движок для долгоживущих workflow (включая саги):
import { proxyActivities, sleep } from '@temporalio/workflow';
const { reserveStock, chargePayment, createShipment, releaseStock, refund } =
proxyActivities({ startToCloseTimeout: '10 seconds' });
export async function createOrderWorkflow(input: CreateOrderInput): Promise<void> {
let stockReserved = false;
let paymentCharged = false;
try {
await reserveStock({ orderId: input.orderId, items: input.items });
stockReserved = true;
await chargePayment({ orderId: input.orderId, amount: input.amount });
paymentCharged = true;
await createShipment({ orderId: input.orderId, address: input.address });
} catch (error) {
// Temporal гарантирует выполнение компенсаций
if (paymentCharged) {
await refund({ orderId: input.orderId });
}
if (stockReserved) {
await releaseStock({ orderId: input.orderId });
}
throw error;
}
}
Temporal автоматически retry activities, сохраняет историю выполнения, позволяет инспектировать и отлаживать workflow через UI.
Хореография через Kafka
// Order Service публикует событие
await kafka.producer.send({
topic: 'order.events',
messages: [{ key: orderId, value: JSON.stringify({
type: 'OrderCreated', orderId, items, customerId
})}]
});
// Inventory Service слушает и резервирует
kafka.consumer.subscribe({ topic: 'order.events' });
kafka.consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
if (event.type !== 'OrderCreated') return;
try {
await inventoryService.reserveStock(event.orderId, event.items);
// Публикуем успех
await kafka.producer.send({
topic: 'inventory.events',
messages: [{ key: event.orderId, value: JSON.stringify({
type: 'StockReserved', orderId: event.orderId
})}]
});
} catch {
// Публикуем неудачу — Order Service откатится
await kafka.producer.send({
topic: 'inventory.events',
messages: [{ key: event.orderId, value: JSON.stringify({
type: 'StockReservationFailed', orderId: event.orderId
})}]
});
}
}
});
Идемпотентность — обязательное требование
Каждая операция в саге должна быть идемпотентной: повторный вызов не создаёт дубликаты.
async function reserveStock(orderId: string, items: Item[]): Promise<Reservation> {
// Проверяем, не была ли уже создана резервация для этого заказа
const existing = await reservationRepo.findByOrderId(orderId);
if (existing) return existing; // идемпотентно
return reservationRepo.create({ orderId, items, status: 'reserved' });
}
Сроки реализации
- Сага с оркестрацией (2–3 сервиса, без Temporal) — 1–2 недели
- Сага с Temporal + мониторинг состояний — 2–3 недели
- Хореография через Kafka с идемпотентными обработчиками — 2–4 недели







