Реализация Serverless Event-Driven архитектуры
Event-driven архитектура на serverless — это система, где компоненты общаются через события, а не прямые вызовы. Lambda функция не знает, кто ещё подписан на результат её работы. Это обеспечивает слабую связность, независимый масштаб и возможность добавлять новых потребителей без изменения источника.
Базовые концепции
Event Source — источник событий: API Gateway (HTTP запрос), S3 (загрузка файла), DynamoDB Streams (изменение записи), SQS (сообщение в очереди), EventBridge (кастомное событие), Kinesis (поток данных).
Event Bridge (шина событий) — маршрутизатор событий. Источник публикует событие, шина доставляет его нужным потребителям по правилам.
Consumer (Lambda) — функция, реагирующая на событие.
Архитектура на примере e-commerce
Обработка заказа без event-driven: PlaceOrder → ValidateInventory → ProcessPayment → SendEmail → UpdateAnalytics — всё последовательно, тесно связано.
С event-driven:
[Client] → PlaceOrder Lambda
↓
EventBridge: order.created
/ | \
ValidateInv SendEmail Analytics
↓
EventBridge: inventory.reserved
↓
ProcessPayment
↓
EventBridge: payment.processed
/ \
FulfillOrder SendReceipt
Каждый сервис реагирует на события независимо. Новый сервис (например, fraud detection) подписывается на order.created без изменений существующего кода.
AWS EventBridge: реализация
# Кастомная шина событий
resource "aws_cloudwatch_event_bus" "orders" {
name = "orders-bus"
}
# Правило маршрутизации
resource "aws_cloudwatch_event_rule" "order_created" {
name = "order-created"
event_bus_name = aws_cloudwatch_event_bus.orders.name
event_pattern = jsonencode({
"detail-type": ["OrderCreated"],
"source": ["com.company.orders"]
})
}
resource "aws_cloudwatch_event_target" "process_inventory" {
rule = aws_cloudwatch_event_rule.order_created.name
event_bus_name = aws_cloudwatch_event_bus.orders.name
arn = aws_lambda_function.validate_inventory.arn
}
resource "aws_cloudwatch_event_target" "send_confirmation" {
rule = aws_cloudwatch_event_rule.order_created.name
event_bus_name = aws_cloudwatch_event_bus.orders.name
arn = aws_lambda_function.send_email.arn
}
Публикация события из Lambda:
import boto3
import json
from datetime import datetime
events = boto3.client('events')
def publish_order_created(order: dict):
events.put_events(
Entries=[{
'EventBusName': 'orders-bus',
'Source': 'com.company.orders',
'DetailType': 'OrderCreated',
'Detail': json.dumps({
'orderId': order['id'],
'customerId': order['customer_id'],
'items': order['items'],
'totalAmount': order['total'],
'timestamp': datetime.utcnow().isoformat()
}),
'Time': datetime.utcnow()
}]
)
SQS для надёжной доставки
EventBridge + SQS = отказоустойчивая доставка с retry и dead letter queue:
resource "aws_sqs_queue" "inventory_updates" {
name = "inventory-updates"
visibility_timeout_seconds = 300
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.inventory_dlq.arn
maxReceiveCount = 3 # После 3 неудачных попыток → DLQ
})
}
resource "aws_lambda_event_source_mapping" "inventory_processor" {
event_source_arn = aws_sqs_queue.inventory_updates.arn
function_name = aws_lambda_function.process_inventory.arn
batch_size = 10
function_response_types = ["ReportBatchItemFailures"]
}
ReportBatchItemFailures — только неудачные сообщения из batch возвращаются в очередь, успешные не повторяются.
Обработчик с partial failure
def handler(event, context):
failed_message_ids = []
for record in event['Records']:
try:
process_message(json.loads(record['body']))
except Exception as e:
# Только этот record пойдёт в retry, остальные — ОК
failed_message_ids.append({'itemIdentifier': record['messageId']})
return {'batchItemFailures': failed_message_ids}
Идемпотентность
В event-driven системах события могут доставляться дважды (at-least-once delivery). Каждый обработчик должен быть идемпотентным:
import boto3
dynamodb = boto3.resource('dynamodb')
processed_events = dynamodb.Table('processed_events')
def handler(event, context):
for record in event['Records']:
event_id = record['messageId']
# Проверить, не обработано ли событие уже
try:
processed_events.put_item(
Item={'event_id': event_id, 'ttl': int(time.time()) + 86400},
ConditionExpression='attribute_not_exists(event_id)'
)
except processed_events.meta.client.exceptions.ConditionalCheckFailedException:
continue # Уже обработано
process_event(record)
Мониторинг event-driven системы
Ключевые метрики:
- Event lag (SQS ApproximateAgeOfOldestMessage) — насколько свежие события обрабатываются
- DLQ depth — число событий в dead letter queue (ненулевое значение = проблема)
- Processing rate vs production rate — успевает ли система потреблять события
- End-to-end latency — время от события до результата через всю цепочку
Сроки реализации
- Проектирование event schema + шины — 2-3 дня
- EventBridge настройка + правила маршрутизации — 2-3 дня
- SQS + DLQ + Lambda event sources — 2-3 дня
- Идемпотентность обработчиков — 2-4 дня
- Distributed tracing + мониторинг — 2-3 дня
- Интеграционное тестирование — 2-4 дня







