Разработка MLOps-инфраструктуры для торговых AI-моделей
MLOps для трейдинга — это специализированная область, где стандартные MLOps-практики дополняются требованиями финансовой отрасли: строгие SLA на latency, audit trail для регуляторных целей, детерминированность результатов и особый подход к переобучению (нельзя допустить случайное ухудшение в момент активной торговли).
Отличия от стандартного MLOps
Latency требования: Для HFT модели должны выдавать предсказания за <1мс. Для внутридневных стратегий — <100мс. Стандартные REST API инференс-сервисы часто не подходят.
Zero-downtime переключение: Замена модели в trading hours рискованна. Нужны механизмы hot-swap без прерывания торговли.
Воспроизводимость: При аудите необходимо точно воспроизвести предсказание модели в конкретный момент времени (которая версия модели работала, какие данные использовались).
Market regime awareness: Переобучение должно учитывать текущий рыночный режим (trend, mean-reversion, high-volatility). Модель, хорошая для трендовых рынков, опасна на боковике.
Архитектура инфраструктуры
┌─────────────────────────────────────────────────────────┐
│ Data Infrastructure │
│ [Market Data Vendor] → [Kafka] → [ClickHouse/TimescaleDB]│
│ [Alternative Data] → [Feature Store] ← [Feature Pipeline]│
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Training Infrastructure │
│ [Airflow/Prefect] → [GPU Training Cluster] │
│ [MLflow] ← [Experiment Tracking] → [Model Registry] │
│ [DVC] → [Data Versioning] │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Inference Infrastructure │
│ [Model Loader] → [Low-Latency Inference Server] │
│ [Shadow Model] → [A/B Framework] → [Active Model] │
│ [Risk Management Layer] → [Execution Engine] │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Monitoring Stack │
│ [Prediction Logger] → [ClickHouse] → [Grafana] │
│ [Drift Detector] → [Alert Manager] → [PagerDuty] │
│ [P&L Attribution] → [Model Performance Dashboard] │
└─────────────────────────────────────────────────────────┘
Низколатентный инференс-сервис
import onnxruntime as ort
import numpy as np
import threading
class LowLatencyModelServer:
"""Инференс с целевой latency < 5мс"""
def __init__(self, model_path: str):
# ONNX Runtime с оптимизациями
opts = ort.SessionOptions()
opts.intra_op_num_threads = 4
opts.inter_op_num_threads = 1
opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(
model_path,
sess_options=opts,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self._lock = threading.RLock()
# Warm up
dummy_input = np.zeros((1, 50), dtype=np.float32)
for _ in range(10):
self.predict(dummy_input)
def predict(self, features: np.ndarray) -> float:
with self._lock:
result = self.session.run(
None,
{'input': features.astype(np.float32)}
)
return float(result[0][0])
Pipeline переобучения с market regime detection
class TradingModelRetrainingPipeline:
def __init__(self, model_registry, risk_manager):
self.registry = model_registry
self.risk = risk_manager
def should_retrain(self, performance_metrics: dict) -> tuple[bool, str]:
# Дрифт признаков
if performance_metrics['feature_psi'] > 0.2:
return True, "Feature drift detected"
# Деградация метрик
if performance_metrics['sharpe_ratio_7d'] < 0.5:
return True, "Sharpe degradation"
# Смена рыночного режима
if self._detect_regime_change():
return True, "Market regime change"
return False, None
def safe_model_swap(self, new_model_path: str):
"""Hot-swap модели без прерывания торговли"""
# 1. Запустить shadow deployment на 2 часа
self._start_shadow_deployment(new_model_path)
# 2. Проверить agreement rate
if self._shadow_agreement_rate() < 0.90:
raise ValueError("Shadow model agreement rate too low for safe swap")
# 3. Переключить в non-trading hours (02:00-09:00)
if not self._is_safe_swap_window():
self._schedule_swap_for_night()
return
# 4. Swap
with self.risk.trading_pause(timeout_seconds=5):
self.active_model = load_model(new_model_path)
Audit trail и воспроизводимость
def log_prediction_for_audit(features, prediction, model_version, timestamp):
audit_store.insert({
'timestamp': timestamp,
'model_version': model_version,
'model_git_hash': get_model_code_hash(model_version),
'data_version': get_feature_data_version(timestamp),
'input_features': features.tolist(),
'prediction': float(prediction),
'prediction_id': str(uuid.uuid4())
})
Каждое предсказание сохраняется с версией модели, версией данных и хэшем кода — достаточно для точного воспроизведения в случае регуляторного запроса или investigation.
Сроки реализации
Базовая MLOps-инфраструктура (трекинг, registry, деплой): 4-6 недель. Полная система с мониторингом дрифта, автопереобучением и audit trail: 3-4 месяца. Это инвестиция, которая окупается при первом серьёзном инциденте с деградацией модели.







