Разработка системы репликации рыночных данных
Репликация рыночных данных решает задачу надёжного распространения market feed от первичных источников (биржи) к множеству потребителей в разных географических локациях или изолированных средах. Это не просто копирование — это обеспечение консистентности, отказоустойчивости и масштабируемости потока данных.
Зачем нужна репликация
Торговые системы часто имеют несколько компонентов, работающих в разных окружениях:
- Production trading — на серверах близко к биржам (co-location или низкий latency)
- Research/backtesting — в дата-центре с большими объёмами хранилища
- Risk management — в защищённой сети с ограниченным доступом
- Analytics dashboards — доступны широкой аудитории
Каждое из этих окружений должно получать одинаковые данные без перегрузки источника.
Топологии репликации
Hub-and-Spoke — один primary-агрегатор собирает данные с бирж, несколько replica-узлов подписываются на него. Простая реализация, single point of failure на hub.
Chain Replication — данные передаются по цепочке: биржа → primary → secondary → tertiary. Низкая нагрузка на первичный источник, высокий cumulative latency.
Pub-Sub через Kafka — primary пишет в Kafka, любое количество consumer groups читает независимо. Наиболее гибкий вариант для production.
Kafka как backbone репликации
Kafka идеально подходит для репликации market data:
- Durability — данные хранятся на диске, consumer может перечитать любой исторический период
- Scalability — горизонтальное масштабирование через партиционирование
- Consumer Groups — разные потребители читают один и тот же топик независимо
- Exactly-once delivery — при правильной настройке idempotent producer
Topic: market.trades.binance.BTCUSDT
Partition 0: trades (all, ordered by time)
Topic: market.orderbook.binance.BTCUSDT
Partition 0: snapshots + diffs (ordered by update_id)
Topic: market.candles.binance.BTCUSDT.1m
Partition 0: 1-minute OHLCV (ordered by candle time)
Именование топиков: {data_type}.{exchange}.{symbol}.{interval} — позволяет легко фильтровать нужные данные.
Гарантии доставки
В системах market data чаще всего используется at-least-once delivery: лучше получить дублирующееся сообщение, чем потерять данные. Потребители идемпотентны: дедупликация по trade_id или update_id.
Для критических компонентов (risk management, позиционный учёт) — exactly-once через Kafka Transactions:
from confluent_kafka import Producer
producer = Producer({
'bootstrap.servers': 'kafka:9092',
'enable.idempotence': True,
'transactional.id': 'market-data-producer-1',
'acks': 'all'
})
producer.init_transactions()
def publish_trade_batch(trades: list[Trade]):
producer.begin_transaction()
try:
for trade in trades:
producer.produce(
topic=f'market.trades.{trade.exchange}.{trade.symbol}',
key=trade.symbol.encode(),
value=serialize(trade)
)
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
raise
Cross-datacenter репликация
Kafka MirrorMaker 2 реплицирует топики между кластерами:
# mirrormaker2.properties
clusters = us-east, eu-west
us-east.bootstrap.servers = kafka-us:9092
eu-west.bootstrap.servers = kafka-eu:9092
us-east->eu-west.enabled = true
us-east->eu-west.topics = market\.*
us-east->eu-west.replication.factor = 2
EU-кластер получает реплику всех market.* топиков с небольшой задержкой (обычно 50–200ms для трансатлантической репликации).
Управление retention и компрессией
Market data накапливается быстро. Kafka retention политики:
# Для tick-данных: 7 дней, после — удалять
log.retention.hours=168
# Для daily OHLCV: бесконечно (используем размерный лимит)
log.retention.bytes=10737418240 # 10 GB на партицию
# Log compaction для order book: сохранять только последнее состояние на уровень цены
log.cleanup.policy=compact
Compression на уровне Kafka: compression.type=zstd сжимает market data на 40–70% без значимых накладных расходов по CPU.
Мониторинг репликации
Ключевые метрики:
| Метрика | Что показывает |
|---|---|
| Consumer lag | Отставание потребителей от producer |
| Replication latency | Задержка между primary и replica кластерами |
| Producer send rate | Скорость публикации (сообщений/сек) |
| Bytes in/out rate | Пропускная способность |
| Under-replicated partitions | Партиции с недостаточной репликацией |
Consumer lag > N минут для торгового бота — критический алерт. Для аналитической системы — warning.
Schema Registry и совместимость форматов
При эволюции схемы данных (добавление полей, изменение типов) важно не сломать потребителей. Confluent Schema Registry + Avro обеспечивают schema evolution с проверкой совместимости:
{
"type": "record",
"name": "Trade",
"namespace": "com.exchange.market",
"fields": [
{"name": "exchange", "type": "string"},
{"name": "symbol", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "price", "type": {"type": "bytes", "logicalType": "decimal", "precision": 24, "scale": 8}},
{"name": "quantity", "type": {"type": "bytes", "logicalType": "decimal", "precision": 24, "scale": 8}},
{"name": "side", "type": {"type": "enum", "name": "Side", "symbols": ["BUY", "SELL"]}},
{"name": "is_maker", "type": ["null", "boolean"], "default": null}
]
}
Новые опциональные поля с default null — backward compatible изменение. Старые потребители продолжают работать с новой схемой.







