Реализация Serverless File Processing (Lambda + S3 trigger)
Lambda + S3 trigger — классический serverless паттерн для обработки файлов. Файл загружается в S3, событие автоматически запускает Lambda, которая обрабатывает файл и сохраняет результат. Не нужен постоянно работающий сервер, масштабирование автоматическое.
Типичные сценарии
- Генерация thumbnail при загрузке изображения
- Конвертация видео в разные форматы и разрешения
- Обработка CSV/Excel файлов, импорт данных в БД
- PDF-генерация из шаблонов
- Антивирусное сканирование загружаемых файлов
- OCR и извлечение текста из документов
- Трансформация данных (XML → JSON, нормализация)
Базовая архитектура
[Пользователь] → S3 upload → [S3 Event Notification]
↓
[Lambda Function]
↓
[Обработанный файл → S3 Output]
[Метаданные → DynamoDB]
[Нотификация → SQS/SNS]
# S3 bucket для входящих файлов
resource "aws_s3_bucket" "uploads" {
bucket = "myapp-uploads"
}
# S3 bucket для обработанных файлов
resource "aws_s3_bucket" "processed" {
bucket = "myapp-processed"
}
# Lambda уведомление от S3
resource "aws_s3_bucket_notification" "upload_trigger" {
bucket = aws_s3_bucket.uploads.id
lambda_function {
lambda_function_arn = aws_lambda_function.processor.arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "images/" # Только файлы в этой папке
filter_suffix = ".jpg" # Только JPG файлы
}
}
Обработчик Lambda
import boto3
import json
import os
from urllib.parse import unquote_plus
from PIL import Image
import io
s3 = boto3.client('s3')
def handler(event, context):
results = []
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = unquote_plus(record['s3']['object']['key'])
try:
result = process_image(bucket, key)
results.append({'key': key, 'status': 'success', **result})
except Exception as e:
# Логировать, но не падать — другие файлы могут обработаться
print(f"Error processing {key}: {e}")
results.append({'key': key, 'status': 'error', 'error': str(e)})
return results
def process_image(bucket: str, key: str) -> dict:
# Скачать оригинал
obj = s3.get_object(Bucket=bucket, Key=key)
image_data = obj['Body'].read()
image = Image.open(io.BytesIO(image_data))
thumbnails = {}
for size_name, (width, height) in [('sm', (150, 150)), ('md', (400, 400)), ('lg', (800, 800))]:
thumb = image.copy()
thumb.thumbnail((width, height), Image.LANCZOS)
buffer = io.BytesIO()
thumb.save(buffer, format=image.format or 'JPEG', quality=85)
buffer.seek(0)
output_key = key.replace('images/', f'thumbnails/{size_name}/')
s3.put_object(
Bucket=os.environ['OUTPUT_BUCKET'],
Key=output_key,
Body=buffer,
ContentType=f'image/{(image.format or "JPEG").lower()}'
)
thumbnails[size_name] = output_key
return {'thumbnails': thumbnails, 'original_size': image.size}
Обработка больших файлов
Lambda имеет ограничения: /tmp до 10GB, timeout до 15 минут, память до 10GB. Для файлов >100MB — стриминговая обработка:
import boto3
import csv
import io
def process_large_csv(bucket: str, key: str):
s3 = boto3.client('s3')
# StreamingBody — читаем по частям без загрузки в память
obj = s3.get_object(Bucket=bucket, Key=key)
batch = []
batch_size = 1000
for line in obj['Body'].iter_lines():
row = line.decode('utf-8')
batch.append(parse_csv_row(row))
if len(batch) >= batch_size:
save_batch_to_db(batch)
batch = []
if batch:
save_batch_to_db(batch)
Для видеотранскодинга — AWS MediaConvert или Elastic Transcoder вместо Lambda (не ограничены по времени).
Обработка ошибок и DLQ
S3 event notifications не поддерживают DLQ напрямую. Надёжная схема:
S3 → SNS Topic → SQS Queue → Lambda
↓ (после maxReceiveCount)
SQS DLQ
resource "aws_s3_bucket_notification" "upload_trigger" {
bucket = aws_s3_bucket.uploads.id
topic {
topic_arn = aws_sns_topic.file_events.arn
events = ["s3:ObjectCreated:*"]
}
}
resource "aws_sns_topic_subscription" "to_sqs" {
topic_arn = aws_sns_topic.file_events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.file_processing.arn
}
Конфигурация Lambda для обработки файлов
resource "aws_lambda_function" "processor" {
filename = "processor.zip"
function_name = "file-processor"
role = aws_iam_role.processor.arn
handler = "handler.handler"
runtime = "python3.12"
timeout = 300 # 5 минут
memory_size = 1024 # 1GB для обработки изображений
ephemeral_storage {
size = 2048 # 2GB /tmp для временных файлов
}
environment {
variables = {
OUTPUT_BUCKET = aws_s3_bucket.processed.bucket
}
}
}
Мониторинг и метрики
- Число обработанных файлов в час
- Среднее время обработки по типу файла
- Error rate + содержимое DLQ
- Lambda duration distribution (outliers = проблемные файлы)
CloudWatch Dashboard с этими метриками + алерт при росте DLQ.
Сроки реализации
- Базовый S3 trigger + Lambda обработчик — 1-2 дня
- Надёжная схема (SNS + SQS + DLQ) — 1-2 дня
- Обработка конкретного типа файлов — 2-5 дней
- Мониторинг + тестирование — 1-2 дня







