Разработка платформы для стриминга прямых трансляций
Live-стриминг — это решение проблемы доставки видео в реальном времени тысячам и миллионам зрителей с минимальной задержкой. Twitch держит ~2–5 секунд задержки, YouTube Live — 5–30 в зависимости от режима. Построить то же самое с нуля — задача нескольких инженеров на несколько месяцев. Разберём, что внутри.
Схема доставки и протоколы
Стример → Ingest → Transcoding → CDN → Viewer
Входящий поток от стримера — обычно RTMP (OBS, StreamLabs, XSplit все умеют RTMP из коробки). На выходе к зрителю — HLS или DASH для браузера, WebRTC для ultra-low-latency (< 1 сек).
OBS/FFMPEG → RTMP → Nginx-RTMP/SRS/Wowza → FFmpeg transcoding
↓
HLS segments → S3/CDN
WebRTC → Selective Forwarding Unit
Ingest-сервер на базе SRS
SRS (Simple Realtime Server) — опенсорс, Go, отлично держит нагрузку:
# docker-compose.yml фрагмент
services:
srs:
image: ossrs/srs:5
ports:
- "1935:1935" # RTMP
- "1985:1985" # HTTP API
- "8080:8080" # HLS
volumes:
- ./srs.conf:/usr/local/srs/conf/srs.conf
# srs.conf
listen 1935;
max_connections 1000;
daemon off;
http_server {
enabled on;
listen 8080;
dir ./objs/nginx/html;
}
vhost __defaultVhost__ {
# Хук: уведомляем backend о начале/конце трансляции
http_hooks {
enabled on;
on_publish http://api:8000/hooks/stream/start;
on_unpublish http://api:8000/hooks/stream/stop;
on_play http://api:8000/hooks/stream/view;
}
hls {
enabled on;
hls_path ./objs/nginx/html;
hls_fragment 2; # 2 секунды — баланс задержки и стабильности
hls_window 10; # 10 сегментов в окне
}
transcode {
enabled on;
ffmpeg /usr/local/bin/ffmpeg;
engine hd {
enabled on;
vcodec libx264;
vbitrate 2000;
vfps 30;
vwidth 1280; vheight 720;
acodec aac;
abitrate 128;
output rtmp://localhost:1935/[app]/[stream]_720p;
}
engine sd {
enabled on;
vcodec libx264;
vbitrate 800;
vfps 30;
vwidth 854; vheight 480;
acodec aac;
abitrate 96;
output rtmp://localhost:1935/[app]/[stream]_480p;
}
}
}
Аутентификация стримера
Стример публикует поток по stream key. Нельзя принимать RTMP от неизвестных источников:
# FastAPI: хук для SRS on_publish
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
class PublishHook(BaseModel):
action: str
app: str
stream: str # stream key от стримера
param: str # query string
@app.post("/hooks/stream/start")
async def on_stream_start(hook: PublishHook):
# Валидируем stream key
streamer = await db.fetchrow(
"SELECT id, user_id, is_active FROM stream_keys WHERE key = $1",
hook.stream
)
if not streamer or not streamer['is_active']:
raise HTTPException(status_code=403, detail="Invalid stream key")
# Запускаем трансляцию в БД
await db.execute("""
INSERT INTO live_streams (user_id, stream_key_id, started_at, status)
VALUES ($1, $2, NOW(), 'live')
ON CONFLICT (stream_key_id) DO UPDATE SET started_at = NOW(), status = 'live'
""", streamer['user_id'], streamer['id'])
# Уведомляем подписчиков через WebSocket
await notify_followers(streamer['user_id'], 'stream_started')
return {"code": 0} # SRS ожидает code=0 для разрешения
HLS-сегменты в S3
SRS пишет сегменты локально, их нужно синхронизировать в S3 для CDN:
# inotifywait: мониторим директорию с сегментами, пушим в S3
inotifywait -m -e close_write /var/srs/hls/ --format '%f' |
while read filename; do
if [[ "$filename" == *.ts || "$filename" == *.m3u8 ]]; then
# m3u8 с коротким cache — часто меняется
# ts с длинным — immutable
if [[ "$filename" == *.m3u8 ]]; then
cache="max-age=2"
else
cache="max-age=86400,immutable"
fi
aws s3 cp "/var/srs/hls/$filename" "s3://live-streams/hls/$filename" \
--cache-control "$cache" \
--content-type "$(get_mime $filename)"
fi
done
Или через s3fs / rclone mount — проще, но менее контролируемо.
Чат в реальном времени
Чат трансляции — обязательный элемент. WebSocket через Redis Pub/Sub:
// Node.js: WebSocket-сервер для чата
import { WebSocketServer } from 'ws';
import { createClient } from 'redis';
const wss = new WebSocketServer({ port: 3001 });
const redis = createClient({ url: process.env.REDIS_URL });
const redisSub = redis.duplicate();
await redis.connect();
await redisSub.connect();
interface ChatMessage {
type: 'message' | 'emote' | 'sub' | 'ban';
streamId: string;
userId: string;
username: string;
text: string;
badges: string[];
timestamp: number;
}
// Подписываемся на канал трансляции
wss.on('connection', (ws, req) => {
const streamId = new URL(req.url!, 'ws://x').searchParams.get('stream');
if (!streamId) return ws.close();
const channel = `chat:${streamId}`;
// Слушаем Redis Pub/Sub для этого стрима
redisSub.subscribe(channel, (message) => {
if (ws.readyState === ws.OPEN) {
ws.send(message);
}
});
ws.on('message', async (data) => {
const msg: ChatMessage = JSON.parse(data.toString());
// Антиспам: rate limit на пользователя
const key = `chat_limit:${msg.userId}:${streamId}`;
const count = await redis.incr(key);
if (count === 1) await redis.expire(key, 5);
if (count > 20) { // 20 сообщений за 5 секунд — слишком много
ws.send(JSON.stringify({ type: 'slowmode', waitMs: 5000 }));
return;
}
// Сохраняем в Redis Stream (скользящее окно 1000 сообщений)
await redis.xAdd(`stream_chat:${streamId}`, '*', msg as any, {
TRIM: { strategy: 'MAXLEN', threshold: 1000 }
});
// Публикуем всем подключённым
await redis.publish(channel, JSON.stringify(msg));
});
ws.on('close', () => {
redisSub.unsubscribe(channel);
});
});
Запись трансляций
VOD (Video on Demand) после окончания стрима — стандартное ожидание:
# Celery task: после завершения стрима конвертируем в VOD
@app.task
def process_vod(stream_id: int):
stream = LiveStream.objects.get(id=stream_id)
# Собираем TS-сегменты в один файл
segments = sorted(
glob(f"/var/srs/hls/{stream.stream_key}/*.ts"),
key=lambda f: int(Path(f).stem.split('_')[-1])
)
concat_list = "/tmp/vod_concat.txt"
with open(concat_list, 'w') as f:
for s in segments: f.write(f"file '{s}'\n")
raw_mp4 = f"/tmp/vod_{stream_id}_raw.mp4"
subprocess.run([
'ffmpeg', '-f', 'concat', '-safe', '0',
'-i', concat_list,
'-c', 'copy',
raw_mp4
], check=True)
# Перекодируем для VOD (оптимизация для веба: faststart)
vod_mp4 = f"/var/vod/{stream_id}.mp4"
subprocess.run([
'ffmpeg', '-i', raw_mp4,
'-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
'-c:a', 'aac', '-b:a', '128k',
'-movflags', '+faststart', # moov atom в начало для псевдостриминга
vod_mp4
], check=True)
stream.vod_path = vod_mp4
stream.status = 'ended'
stream.save()
Масштабирование: многосерверный ingest
Один ingest-сервер — SPOF. Для продакшна нужен кластер с балансировкой:
DNS → Load Balancer (GeoDNS) → Ingest cluster
↓
Transcoding workers (GPU)
↓
HLS → S3 → CDN
Стримеры направляются на ближайший ingest по GeoDNS. Каждый ingest пишет в общее объектное хранилище или реплицирует сегменты синхронно.
Сроки
MVP с RTMP-приёмом, HLS-доставкой, WebSocket-чатом и записью в VOD — 10–12 недель. Добавление транскодинга нескольких качеств, gift-subscriptions, модерации чата, мобильного плеера — ещё 8–10 недель. Масштабирование до 10k+ concurrent viewers, балансировка ingest, CDN с origin shield — отдельный этап архитектурного проектирования.







