Реализация CQRS (Command Query Responsibility Segregation) для веб-приложения
CQRS разделяет модели записи (Commands) и чтения (Queries) в приложении. Вместо одной модели, которая одновременно и принимает изменения, и отвечает на запросы, создаются две независимые стороны: Command Side обрабатывает бизнес-операции и изменяет состояние, Query Side возвращает денормализованные представления для UI.
Мотивация
Стандартная CRUD-архитектура ломается при росте нагрузки, когда:
- Читают на порядки чаще, чем пишут — нужно масштабировать независимо
- Форма данных для записи отличается от формы для чтения (нормализация vs. денормализация)
- Сложные бизнес-правила при записи конфликтуют с производительностью запросов
- Нужна история изменений или аудит
CQRS решает эти задачи ценой увеличения сложности системы. Для простого CRUD-приложения он избыточен.
Структура Command Side
Command — намерение изменить состояние. Содержит всё необходимое для выполнения операции:
// Commands — неизменяемые объекты намерения
interface CreateOrderCommand {
readonly type: 'CreateOrder';
readonly customerId: string;
readonly items: Array<{ productId: string; quantity: number; price: number }>;
readonly shippingAddress: Address;
}
interface CancelOrderCommand {
readonly type: 'CancelOrder';
readonly orderId: string;
readonly reason: string;
readonly requestedBy: string;
}
Command Handler — обрабатывает один тип команды, содержит бизнес-логику:
class CreateOrderCommandHandler {
constructor(
private orderRepo: OrderRepository,
private productRepo: ProductRepository,
private eventBus: EventBus
) {}
async handle(command: CreateOrderCommand): Promise<string> {
// 1. Загрузить агрегат или создать новый
const order = Order.create(command.customerId);
// 2. Применить бизнес-правила
for (const item of command.items) {
const product = await this.productRepo.findById(item.productId);
if (!product.isAvailable(item.quantity)) {
throw new InsufficientStockError(item.productId);
}
order.addItem(item);
}
order.setShippingAddress(command.shippingAddress);
order.submit();
// 3. Сохранить агрегат
await this.orderRepo.save(order);
// 4. Опубликовать доменные события
await this.eventBus.publishAll(order.pullDomainEvents());
return order.id;
}
}
Command Bus — маршрутизирует команды к соответствующим хендлерам:
class CommandBus {
private handlers = new Map<string, CommandHandler>();
register<T extends Command>(type: string, handler: CommandHandler<T>) {
this.handlers.set(type, handler);
}
async dispatch<T extends Command>(command: T): Promise<unknown> {
const handler = this.handlers.get(command.type);
if (!handler) throw new Error(`No handler for ${command.type}`);
// middleware: validation, logging, retry
return handler.handle(command);
}
}
Структура Query Side
Query — запрос данных без побочных эффектов:
interface GetOrderDetailsQuery {
readonly type: 'GetOrderDetails';
readonly orderId: string;
}
interface GetCustomerOrdersQuery {
readonly type: 'GetCustomerOrders';
readonly customerId: string;
readonly status?: OrderStatus;
readonly page: number;
readonly perPage: number;
}
Read Model — денормализованное представление, оптимизированное под конкретный UI:
interface OrderDetailsReadModel {
id: string;
status: string;
customer: { id: string; name: string; email: string };
items: Array<{
productId: string;
productName: string;
quantity: number;
unitPrice: number;
subtotal: number;
}>;
shippingAddress: Address;
totals: { subtotal: number; shipping: number; tax: number; total: number };
timeline: Array<{ event: string; occurredAt: Date; actor: string }>;
createdAt: Date;
updatedAt: Date;
}
Query Handler — читает напрямую из Read Model, без загрузки агрегата:
class GetOrderDetailsQueryHandler {
constructor(private db: Database) {}
async handle(query: GetOrderDetailsQuery): Promise<OrderDetailsReadModel> {
return this.db.queryOne(`
SELECT
o.id, o.status, o.created_at, o.updated_at,
c.id as customer_id, c.name as customer_name, c.email,
json_agg(json_build_object(
'productId', oi.product_id,
'productName', p.name,
'quantity', oi.quantity,
'unitPrice', oi.unit_price,
'subtotal', oi.quantity * oi.unit_price
)) as items,
o.shipping_address,
o.total_amount
FROM orders_view o
JOIN customers c ON c.id = o.customer_id
JOIN order_items_view oi ON oi.order_id = o.id
JOIN products p ON p.id = oi.product_id
WHERE o.id = $1
GROUP BY o.id, c.id
`, [query.orderId]);
}
}
Синхронизация Read Model
Read Model обновляется асинхронно через доменные события. Eventual consistency — Read Model может кратковременно отставать от Write Model.
class OrderReadModelUpdater {
// Подписан на события из EventBus или Kafka
async on(event: DomainEvent) {
switch (event.eventType) {
case 'OrderCreated':
await this.db.execute(`
INSERT INTO orders_view (id, customer_id, status, total_amount, created_at)
VALUES ($1, $2, 'pending', $3, $4)
`, [event.aggregateId, event.payload.customerId,
event.payload.total, event.occurredAt]);
break;
case 'OrderStatusChanged':
await this.db.execute(`
UPDATE orders_view
SET status = $2, updated_at = $3
WHERE id = $1
`, [event.aggregateId, event.payload.newStatus, event.occurredAt]);
break;
case 'OrderItemAdded':
await this.db.execute(`
INSERT INTO order_items_view (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)
ON CONFLICT (order_id, product_id) DO UPDATE
SET quantity = EXCLUDED.quantity
`, [event.aggregateId, event.payload.productId,
event.payload.quantity, event.payload.price]);
break;
}
}
}
Масштабирование
Write Side — вертикальное масштабирование основной БД, шардирование по aggregate_id.
Read Side — горизонтальное масштабирование read replicas PostgreSQL, Redis-кеш для hot data, ElasticSearch для полнотекстового поиска, отдельная таблица или схема под каждый Read Model.
Уровни реализации CQRS
| Уровень | Описание | Сложность |
|---|---|---|
| Логическое разделение | Отдельные методы/классы для commands и queries | Низкая |
| Разные модели данных | Commands → нормализованная БД, Queries → денормализованные views | Средняя |
| Разные базы данных | Write DB (PostgreSQL), Read DB (Redis/Elastic) | Высокая |
| Разные сервисы | Write и Read — отдельные микросервисы с независимым деплоем | Очень высокая |
Начинать стоит с логического разделения. Переходить к разным БД только при доказанной необходимости.
Сроки реализации
- Рефакторинг существующего приложения к Command/Query разделению — 1–2 недели
- Новое приложение с CQRS от старта — 2–3 недели
- Полный CQRS + Event Sourcing + async Read Models — 4–8 недель в зависимости от сложности домена







