Разработка pipeline обработки order book данных для ML
Order book данные — богатейший источник информации о рыночной структуре. Полный стакан заявок содержит информацию об ожидаемом спросе/предложении, которая недоступна из OHLCV данных. Однако объём и структура этих данных требуют специализированного pipeline.
Структура order book д��нных
Level 1 (Top of Book): лучший bid и ask с объёмами. Минимальный объём, максимальная актуальность.
Level 2 (Full Depth): все уровни стакана с объёмами. Binance предоставляет глубину 5000 уровней. Обновляется через WebSocket diff stream.
Level 3 (Full Order Feed): каждый отдельный ордер с ID. Доступен не на всех биржах, максимальная детальность.
Сбор и хранение
import asyncio
import websockets
import json
from collections import deque
import numpy as np
class OrderBookCollector:
def __init__(self, symbol, max_depth=100):
self.symbol = symbol
self.bids = {} # price -> quantity
self.asks = {}
self.max_depth = max_depth
self.snapshots = deque(maxlen=10000)
async def connect_binance(self):
url = f"wss://stream.binance.com:9443/ws/{self.symbol.lower()}@depth@100ms"
async with websockets.connect(url) as ws:
# Сначала получаем snapshot через REST
await self.fetch_snapshot()
async for msg in ws:
data = json.loads(msg)
self.process_diff_update(data)
# Сохраняем снэпшот каждые N updates
if len(self.snapshots) % 10 == 0:
self.save_snapshot()
def process_diff_update(self, data):
for bid_level in data.get('b', []):
price, qty = float(bid_level[0]), float(bid_level[1])
if qty == 0:
self.bids.pop(price, None)
else:
self.bids[price] = qty
for ask_level in data.get('a', []):
price, qty = float(ask_level[0]), float(ask_level[1])
if qty == 0:
self.asks.pop(price, None)
else:
self.asks[price] = qty
def get_features(self, n_levels=20):
"""Извлекаем ML features из текущего состояния стакана"""
sorted_bids = sorted(self.bids.items(), reverse=True)[:n_levels]
sorted_asks = sorted(self.asks.items())[:n_levels]
if not sorted_bids or not sorted_asks:
return None
mid_price = (sorted_bids[0][0] + sorted_asks[0][0]) / 2
features = {}
# Объёмы на разных уровнях
for i, (price, qty) in enumerate(sorted_bids[:10]):
features[f'bid_qty_{i}'] = qty
features[f'bid_dist_{i}'] = (mid_price - price) / mid_price
for i, (price, qty) in enumerate(sorted_asks[:10]):
features[f'ask_qty_{i}'] = qty
features[f'ask_dist_{i}'] = (price - mid_price) / mid_price
# Order Book Imbalance (OBI)
bid_vol_n = sum(qty for _, qty in sorted_bids[:5])
ask_vol_n = sum(qty for _, qty in sorted_asks[:5])
features['obi_5'] = (bid_vol_n - ask_vol_n) / (bid_vol_n + ask_vol_n + 1e-8)
bid_vol_20 = sum(qty for _, qty in sorted_bids[:20])
ask_vol_20 = sum(qty for _, qty in sorted_asks[:20])
features['obi_20'] = (bid_vol_20 - ask_vol_20) / (bid_vol_20 + ask_vol_20 + 1e-8)
# Weighted mid price
features['wmid'] = (
sorted_bids[0][0] * sorted_asks[0][1] +
sorted_asks[0][0] * sorted_bids[0][1]
) / (sorted_bids[0][1] + sorted_asks[0][1])
# Spread
features['spread'] = (sorted_asks[0][0] - sorted_bids[0][0]) / mid_price
# Depth asymmetry at multiple levels
for n in [5, 10, 20]:
bid_depth = sum(qty for _, qty in sorted_bids[:n])
ask_depth = sum(qty for _, qty in sorted_asks[:n])
features[f'depth_ratio_{n}'] = bid_depth / max(ask_depth, 1e-8)
return features
Order Book Imbalance (OBI) как торговый сигнал
OBI — наиболее исследованный признак из order book для краткосрочного прогнозирования:
def calculate_multi_level_obi(orderbook, levels=[1, 5, 10, 20]):
"""Расчёт OBI на разных глубинах"""
obi_features = {}
sorted_bids = sorted(orderbook['bids'], reverse=True)
sorted_asks = sorted(orderbook['asks'])
for n in levels:
bid_vol = sum(qty for _, qty in sorted_bids[:n])
ask_vol = sum(qty for _, qty in sorted_asks[:n])
obi_features[f'obi_{n}'] = (bid_vol - ask_vol) / (bid_vol + ask_vol + 1e-8)
# OBI slope (изменение OBI за последние K обновлений)
return obi_features
Хранение order book данных
Полный L2 order book — огромный объём данных. Стратегии хранения:
Snapshots: полный стакан каждые N секунд (например, каждые 100ms). Для 20 уровней × 2 стороны × 2 значения = 80 float значений. При 10 обновлений/сек × 86400 сек = 69M записей/день.
ClickHouse — идеально для order book данных: высокая скорость записи, эффективное колоночное хранение, быстрые агрегации.
CREATE TABLE order_book_snapshots (
timestamp DateTime64(3), -- миллисекундная точность
symbol LowCardinality(String),
exchange LowCardinality(String),
-- Bid levels (price + quantity для 20 уровней)
bid_price_0 Float32, bid_qty_0 Float32,
bid_price_1 Float32, bid_qty_1 Float32,
-- ... до bid_price_19, bid_qty_19
-- Ask levels аналогично
ask_price_0 Float32, ask_qty_0 Float32,
-- ...
-- Pre-calculated features
spread Float32,
obi_5 Float32,
obi_20 Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (symbol, timestamp)
TTL timestamp + INTERVAL 90 DAY; -- хранение 90 дней
Feature engineering из order book
def engineer_orderbook_features(snapshots_df, window_sizes=[10, 50, 100]):
"""
snapshots_df: DataFrame с историей снэпшотов стакана
"""
features = snapshots_df.copy()
# Производные от OBI
for window in window_sizes:
# Скользящее среднее OBI
features[f'obi_5_ma_{window}'] = features['obi_5'].rolling(window).mean()
# Изменение OBI
features[f'obi_5_delta_{window}'] = features['obi_5'].diff(window)
# Волатильность OBI
features[f'obi_5_std_{window}'] = features['obi_5'].rolling(window).std()
# Volume imbalance accumulation (COF - Cumulative Order Flow)
features['cof'] = features['obi_5'].cumsum()
features['cof_ma'] = features['cof'].rolling(100).mean()
features['cof_deviation'] = features['cof'] - features['cof_ma']
# Spread dynamics
features['spread_ma'] = features['spread'].rolling(50).mean()
features['spread_ratio'] = features['spread'] / features['spread_ma']
# Depth stability (изменение глубины стакана)
features['depth_change'] = features['depth_ratio_10'].diff(10)
return features
Краткосрочный прогноз mid-price из OB
Задача: предсказать изменение mid-price через N обновлений (например, через 10 обновлений стакана ≈ 1 секунда):
def create_training_data(snapshots_df, prediction_horizon=10):
features = engineer_orderbook_features(snapshots_df)
# Target: знак изменения mid-price через horizon обновлений
future_mid = snapshots_df['mid_price'].shift(-prediction_horizon)
current_mid = snapshots_df['mid_price']
target = np.sign(future_mid - current_mid) # -1, 0, 1
# Убираем строки с NaN
valid_mask = features.notna().all(axis=1) & target.notna()
return features[valid_mask], target[valid_mask]
Разрабатываем полный order book ML pipeline: WebSocket коллектор с инкрементальным обновлением, ClickHouse для хранения снэпшотов, feature engineering из OBI и depth данных, обучение краткосрочной прогностической модели (LightGBM/XGBoost) и realtime inference.







