Разработка Workflow-движка на базе Temporal
Temporal — платформа для надёжного выполнения длительных бизнес-процессов. Workflow-функции в Temporal выглядят как обычный код, но Temporal автоматически сохраняет их состояние, обеспечивает retry при сбоях и позволяет workflow переживать рестарты серверов.
Чем Temporal отличается от очередей
В очереди сообщений (RabbitMQ, Kafka) при сбое между шагами нужно самостоятельно управлять состоянием, dead letter queue, retry-логикой. В Temporal workflow-функция «засыпает» и «просыпается» — движок гарантирует, что она выполнится до конца.
Ключевые концепции
Workflow — детерминированная функция, определяющая порядок шагов. Может «спать» часами/днями, ждать сигналов.
Activity — отдельный шаг с side effects (HTTP-запрос, запись в БД). Activities имеют retry-политику.
Worker — процесс, который выполняет Workflow и Activity код.
Signal — внешнее событие, которое может изменить состояние workflow (например, «платёж подтверждён»).
Query — чтение текущего состояния workflow без изменения.
Установка Temporal Server
# docker-compose.yml
services:
temporal:
image: temporalio/auto-setup:1.22
ports:
- "7233:7233"
environment:
- DB=postgresql
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgresql
depends_on:
- postgresql
temporal-ui:
image: temporalio/ui:2.22
ports:
- "8080:8080"
environment:
- TEMPORAL_ADDRESS=temporal:7233
postgresql:
image: postgres:15-alpine
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
POSTGRES_DB: temporal
Workflow — Node.js SDK
import { defineActivity, defineWorkflow, proxyActivities, sleep, setHandler, defineSignal, defineQuery } from '@temporalio/workflow';
// Определяем activities интерфейс
const { validateOrder, reserveInventory, processPayment,
sendConfirmation, releaseInventory, refundPayment } =
proxyActivities<typeof import('./activities')>({
startToCloseTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
initialInterval: '1 second',
backoffCoefficient: 2,
}
});
// Signals и Queries
const paymentConfirmedSignal = defineSignal<[{ paymentId: string }]>('paymentConfirmed');
const cancelOrderSignal = defineSignal<[{ reason: string }]>('cancelOrder');
const orderStatusQuery = defineQuery<string>('orderStatus');
// Workflow
export async function orderWorkflow(orderId: string): Promise<OrderResult> {
let status = 'validating';
let cancelled = false;
setHandler(orderStatusQuery, () => status);
setHandler(cancelOrderSignal, ({ reason }) => {
cancelled = true;
status = `cancelled: ${reason}`;
});
// Шаг 1: Валидация
status = 'validating';
const validation = await validateOrder(orderId);
if (!validation.valid) {
return { success: false, reason: validation.reason };
}
if (cancelled) return { success: false, reason: 'Cancelled before reservation' };
// Шаг 2: Резервирование товара
status = 'reserving';
let inventoryReserved = false;
try {
await reserveInventory(orderId, validation.items);
inventoryReserved = true;
} catch (e) {
return { success: false, reason: 'Insufficient stock' };
}
if (cancelled) {
await releaseInventory(orderId);
return { success: false, reason: 'Cancelled' };
}
// Шаг 3: Ожидание подтверждения оплаты (до 30 минут)
status = 'awaiting_payment';
let paymentId: string | null = null;
setHandler(paymentConfirmedSignal, ({ paymentId: pid }) => {
paymentId = pid;
});
// Ждём сигнал или таймаут
await sleep('30 minutes');
if (!paymentId) {
await releaseInventory(orderId);
return { success: false, reason: 'Payment timeout' };
}
// Шаг 4: Обработка платежа
status = 'processing_payment';
try {
await processPayment(orderId, paymentId);
} catch (e) {
await releaseInventory(orderId);
return { success: false, reason: 'Payment failed' };
}
// Шаг 5: Подтверждение
status = 'completed';
await sendConfirmation(orderId);
return { success: true, orderId };
}
Activities — реальные операции
// activities.ts
export async function validateOrder(orderId: string): Promise<ValidationResult> {
// Здесь реальный HTTP-запрос или вызов БД
const order = await orderRepository.findById(orderId);
if (!order) throw new ApplicationFailure(`Order ${orderId} not found`);
const itemsValid = await checkItemsAvailability(order.items);
return { valid: itemsValid, items: order.items, reason: itemsValid ? null : 'Items unavailable' };
}
export async function processPayment(orderId: string, paymentId: string): Promise<void> {
const result = await stripeService.capturePayment(paymentId);
if (result.status !== 'succeeded') {
throw new ApplicationFailure(`Payment capture failed: ${result.failureMessage}`);
}
await orderRepository.markAsPaid(orderId, paymentId);
}
Worker
// worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'orders',
maxConcurrentActivityTaskExecutions: 50,
maxConcurrentWorkflowTaskExecutions: 50,
});
await worker.run();
Запуск Workflow и отправка сигналов
// client.ts
import { Client } from '@temporalio/client';
const client = new Client();
// Старт workflow
const handle = await client.workflow.start(orderWorkflow, {
taskQueue: 'orders',
workflowId: `order-${orderId}`,
args: [orderId],
});
// Отправка сигнала при подтверждении оплаты (из webhook Stripe)
await client.workflow.getHandle(`order-${orderId}`)
.signal(paymentConfirmedSignal, { paymentId: stripePaymentId });
// Запрос текущего статуса
const status = await client.workflow.getHandle(`order-${orderId}`)
.query(orderStatusQuery);
console.log('Order status:', status); // 'awaiting_payment'
Versioning
При изменении workflow-кода нужна версионизация, чтобы не сломать активные экземпляры:
import { patched } from '@temporalio/workflow';
export async function orderWorkflow(orderId: string) {
// Старые экземпляры используют старый путь
// Новые — новый
if (patched('add-fraud-check')) {
await fraudCheck(orderId);
}
await validateOrder(orderId);
// ...
}
Сроки реализации
- Temporal Server (Docker) + один workflow с 3–5 activities — 1–2 недели
- Сложный workflow с сигналами, таймерами и versioning — 2–4 недели
- Миграция существующего saga/queue-based кода на Temporal — 1–2 месяца







