Разработка WebSocket-агрегатора рыночных данных

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка WebSocket-агрегатора рыночных данных
Средний
~5 дней
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    860

Разработка WebSocket-агрегатора рыночных данных

WebSocket-агрегатор — это сервис, который поддерживает постоянные соединения с биржами и транслирует нормализованные рыночные данные downstream-потребителям: торговым ботам, дашбордам, системам риск-менеджмента. Ключевой параметр качества — latency от биржи до потребителя и устойчивость к разрывам соединений.

Топология соединений

Каждая биржа имеет собственные ограничения на WebSocket:

Биржа Max streams / conn Ping interval Max connections
Binance 1024 3 мин Не ограничено
Bybit 10 тем / conn 20 сек Не ограничено
OKX 240 каналов / conn 30 сек Не ограничено
Kraken Не задокументировано Adaptive Не ограничено

Агрегатор создаёт столько соединений, сколько нужно для охвата всех подписок, распределяя символы по соединениям с учётом лимитов.

Connection Manager

class ConnectionManager:
    def __init__(self, max_per_conn: int = 900):
        self.connections: list[WSConnection] = []
        self.max_per_conn = max_per_conn
        self.subscriptions: dict[str, WSConnection] = {}

    async def subscribe(self, channels: list[str]):
        for channel in channels:
            conn = self._find_or_create_connection()
            await conn.subscribe(channel)
            self.subscriptions[channel] = conn

    def _find_or_create_connection(self) -> WSConnection:
        for conn in self.connections:
            if conn.subscription_count < self.max_per_conn:
                return conn
        new_conn = WSConnection(self.on_message, self.on_disconnect)
        self.connections.append(new_conn)
        return new_conn

    async def on_disconnect(self, conn: WSConnection):
        # Экспоненциальный backoff и переподписка
        await asyncio.sleep(conn.backoff.next())
        await conn.reconnect()
        await conn.resubscribe()

Heartbeat и обнаружение stale-соединений

Биржи могут "замолчать" без явного разрыва — TCP-соединение живо, но данных нет. Watchdog-таймер для каждого соединения:

class HeartbeatMonitor:
    STALE_THRESHOLD_SEC = 30

    async def watch(self, conn: WSConnection):
        while True:
            await asyncio.sleep(5)
            age = time.time() - conn.last_message_time
            if age > self.STALE_THRESHOLD_SEC:
                logger.warning(f"Stale connection detected, forcing reconnect")
                await conn.force_reconnect()

Публикация данных потребителям

Агрегатор публикует нормализованные данные через несколько каналов в зависимости от потребностей:

Redis Pub/Sub — для real-time рассылки с минимальной latency. Подходит когда данные не нужно сохранять.

Redis Streams — для надёжной доставки с возможностью чтения пропущенных сообщений (consumer groups, persistent log).

Kafka — для высоконагруженных систем с требованием гарантированной доставки и партиционирования по символу.

gRPC streaming — для прямых соединений клиент-агрегатор с low latency.

Метрики производительности

Агрегатор должен экспортировать Prometheus-метрики для мониторинга:

  • ws_messages_received_total{exchange, channel} — счётчик входящих сообщений
  • ws_message_latency_ms{exchange} — задержка от биржевого timestamp до получения
  • ws_reconnects_total{exchange} — количество переподключений
  • ws_active_connections{exchange} — текущее количество соединений
  • ws_subscription_count{exchange} — количество активных подписок

При правильной реализации на Python (asyncio) агрегатор обрабатывает 50,000–100,000 сообщений в секунду на одном ядре. На Go или Rust — на порядок больше.