Настройка real-time граббинга через WebSocket

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Настройка real-time граббинга через WebSocket
Средний
~2-3 дня
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1286
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Настройка real-time парсинга через WebSocket

Polling REST API раз в N секунд — неправильный инструмент для задач, требующих реакции на события. При polling с интервалом 1 секунда средняя задержка обнаружения события — 0.5 секунды. WebSocket-подписка даёт событие в момент его возникновения, задержка определяется только сетью (10–50мс до ближайшего сервера биржи). Для мониторинга цен, orderbook и on-chain событий разница принципиальна.

WebSocket протоколы бирж

Каждая биржа имеет свой протокол подписки. Паттерны схожи, детали различаются.

Binance: subscribe через JSON сообщение, stream names в формате symbol@streamType:

import asyncio
import json
import websockets

async def binance_stream(symbols: list[str]):
    streams = '/'.join([f"{s.lower()}@trade" for s in symbols])
    url = f"wss://stream.binance.com:9443/stream?streams={streams}"
    
    async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
        async for message in ws:
            data = json.loads(message)
            stream_data = data.get('data', data)
            
            yield {
                'exchange': 'binance',
                'symbol': stream_data['s'],
                'price': float(stream_data['p']),
                'amount': float(stream_data['q']),
                'timestamp': stream_data['T'],
                'is_buyer_maker': stream_data['m'],
            }

Coinbase Advanced Trade: использует subscribe message с channel и product_ids:

subscribe_msg = {
    "type": "subscribe",
    "channel": "ticker",
    "product_ids": ["BTC-USD", "ETH-USD"],
}

Kraken: требует генерацию subscription ID и имеет особенности формата ответа с парой в массиве.

Ethereum/EVM: web3.py и ethers.js WebSocket

On-chain события через WebSocket subscriptions к Ethereum ноде (Alchemy, Infura, QuickNode или собственная нода):

from web3 import AsyncWeb3, WebSocketProvider

async def subscribe_to_transfers(token_address: str):
    w3 = AsyncWeb3(WebSocketProvider(
        "wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
    ))
    
    # ERC-20 Transfer event signature hash
    transfer_sig = w3.keccak(text="Transfer(address,address,uint256)").hex()
    
    subscription_id = await w3.eth.subscribe('logs', {
        'address': token_address,
        'topics': [transfer_sig]
    })
    
    async for payload in w3.socket.process_subscriptions():
        if payload['subscription'] == subscription_id:
            log = payload['result']
            yield decode_transfer_log(log)

Ethereum JSON-RPC WebSocket поддерживает три типа подписок: newHeads (новые блоки), logs (события контрактов), newPendingTransactions (mempool транзакции).

Управление соединениями: reconnect и heartbeat

WebSocket соединения разрываются по разным причинам: timeout сервера, network hiccup, перезапуск сервиса биржи. Production система должна автоматически восстанавливаться:

import asyncio
import websockets
from datetime import datetime

class RobustWebSocketClient:
    def __init__(self, url: str, reconnect_delay: float = 1.0):
        self.url = url
        self.reconnect_delay = reconnect_delay
        self.max_reconnect_delay = 60.0
        self.last_message_at = None
        self.stale_threshold = 30  # секунд без сообщений = staleness
    
    async def connect_with_retry(self, on_message, on_subscribe):
        delay = self.reconnect_delay
        
        while True:
            try:
                async with websockets.connect(
                    self.url,
                    ping_interval=20,
                    ping_timeout=10,
                    close_timeout=5,
                ) as ws:
                    await on_subscribe(ws)
                    delay = self.reconnect_delay  # сбрасываем при успехе
                    
                    async for msg in ws:
                        self.last_message_at = datetime.utcnow()
                        await on_message(msg)
                        
            except (websockets.ConnectionClosed, 
                    websockets.InvalidHandshake,
                    OSError) as e:
                print(f"Connection error: {e}, reconnecting in {delay}s")
                await asyncio.sleep(delay)
                delay = min(delay * 2, self.max_reconnect_delay)
    
    async def staleness_watchdog(self):
        """Детектирует зависшее соединение без явного разрыва"""
        while True:
            await asyncio.sleep(10)
            if self.last_message_at:
                elapsed = (datetime.utcnow() - self.last_message_at).seconds
                if elapsed > self.stale_threshold:
                    raise RuntimeError(f"Connection stale: {elapsed}s without data")

Order book: incremental updates vs снапшоты

Большинство бирж отдают orderbook через incremental updates — только изменившиеся уровни. Локальное поддержание актуального состояния orderbook:

from sortedcontainers import SortedDict

class LocalOrderBook:
    def __init__(self):
        self.bids = SortedDict(lambda k: -k)  # descending
        self.asks = SortedDict()               # ascending
        self.last_update_id = 0
    
    def apply_snapshot(self, snapshot: dict):
        self.bids.clear()
        self.asks.clear()
        for price, qty in snapshot['bids']:
            self.bids[float(price)] = float(qty)
        for price, qty in snapshot['asks']:
            self.asks[float(price)] = float(qty)
        self.last_update_id = snapshot['lastUpdateId']
    
    def apply_update(self, update: dict):
        if update['u'] <= self.last_update_id:
            return  # устаревший update, игнорируем
        
        for price, qty in update['b']:  # bids
            p, q = float(price), float(qty)
            if q == 0:
                self.bids.pop(p, None)
            else:
                self.bids[p] = q
        
        for price, qty in update['a']:  # asks
            p, q = float(price), float(qty)
            if q == 0:
                self.asks.pop(p, None)
            else:
                self.asks[p] = q
        
        self.last_update_id = update['u']
    
    def best_bid(self) -> tuple[float, float]:
        k = next(iter(self.bids))
        return k, self.bids[k]
    
    def best_ask(self) -> tuple[float, float]:
        k = next(iter(self.asks))
        return k, self.asks[k]

Важно: при старте нужно получить снапшот через REST, затем применять WebSocket updates начиная с lastUpdateId > snapshotId. Обновления до снапшота отбрасываются, пропуск в последовательности Uu требует повторного снапшота.

Масштабирование: множество пар и бирж

Одна async event loop в Python справляется с 50–200 одновременными WebSocket соединениями. Для большего числа — несколько процессов или Go-сервис (goroutines значительно легче asyncio tasks).

Fanout результатов: обработанные сообщения → Redis Pub/Sub или Kafka для downstream consumers. WebSocket handler должен минимально обрабатывать данные и быстро публиковать — тяжёлую обработку делает отдельный consumer.

async def handle_message(raw: str, redis_client):
    data = json.loads(raw)
    normalized = normalize_trade(data)
    
    # Быстрая публикация — не блокируем event loop
    await redis_client.publish(
        f"trades:{normalized['exchange']}:{normalized['symbol']}",
        json.dumps(normalized)
    )

Мониторинг здоровья

Метрики для каждого WebSocket соединения: messages per second, reconnect count, last message timestamp, lag от биржевого timestamp до processing timestamp. Grafana + Prometheus alerting на stale connections (> 60 сек без сообщений по активной паре).

Настройка real-time парсинга для 3–5 бирж с мониторингом 20–50 пар, reconnect логикой и публикацией в Redis/Kafka — 1–2 недели.