Разработка системы мониторинга деградации ML-модели
ML модели для торговли деградируют по нескольким причинам: рыночные режимы меняются, арбитражируются паттерны которые модель использовала, изменяются корреляции между активами. Мониторинг деградации позволяет обнаружить проблему до того, как она нанесёт финансовый ущерб.
Типы деградации
Concept drift: изменились отношения между features и target. Модель предсказывает правильно для «старого» рынка.
Data drift (feature drift): входные данные стали распределяться иначе, чем при обучении. Модель получает нетипичные inputs.
Label drift: само распределение target переменной изменилось (например, рынок из трендового стал боковым).
Performance degradation: качество предсказаний снизилось даже без явного drift.
Метрики для мониторинга
import numpy as np
import pandas as pd
from scipy import stats
from collections import deque
class ModelDegradationMonitor:
def __init__(self, model_id, baseline_metrics, alert_thresholds):
self.model_id = model_id
self.baseline = baseline_metrics
self.thresholds = alert_thresholds
# Rolling windows для метрик
self.predictions_buffer = deque(maxlen=500)
self.actuals_buffer = deque(maxlen=500)
self.features_buffer = deque(maxlen=1000)
def log_prediction(self, features, prediction, confidence):
self.predictions_buffer.append({
'prediction': prediction,
'confidence': confidence,
'timestamp': datetime.utcnow()
})
self.features_buffer.append(features)
def log_actual(self, actual_return):
self.actuals_buffer.append(actual_return)
def calculate_performance_metrics(self, window=100):
if len(self.predictions_buffer) < window:
return None
recent_preds = [p['prediction'] for p in list(self.predictions_buffer)[-window:]]
recent_actuals = list(self.actuals_buffer)[-window:]
if len(recent_actuals) < window:
return None
# Directional accuracy
dir_accuracy = np.mean(
np.sign(recent_preds) == np.sign(recent_actuals)
)
# Confidence calibration: при высокой уверенности должна быть высокая точность
high_conf_preds = [
(p['prediction'], a)
for p, a in zip(list(self.predictions_buffer)[-window:], recent_actuals)
if p['confidence'] > 0.65
]
if high_conf_preds:
high_conf_accuracy = np.mean([
np.sign(pred) == np.sign(actual)
for pred, actual in high_conf_preds
])
else:
high_conf_accuracy = None
return {
'directional_accuracy': dir_accuracy,
'high_conf_accuracy': high_conf_accuracy,
'degradation': dir_accuracy - self.baseline.get('directional_accuracy', 0.55),
'n_predictions': window
}
def calculate_psi(self, train_distribution, current_values, n_bins=10):
"""Population Stability Index для feature drift"""
bins = np.percentile(train_distribution, np.linspace(0, 100, n_bins + 1))
bins[0] -= 1e-8
train_pct = np.ones(n_bins) / n_bins # равномерное по квантилям
current_hist = np.histogram(current_values, bins=bins)[0]
current_pct = np.clip(current_hist / current_hist.sum(), 1e-8, None)
psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
return psi
def detect_concept_drift(self, method='ks_test', alpha=0.05):
"""KS-test для сравнения распределений недавних и исторических предсказаний"""
if len(self.predictions_buffer) < 200:
return False, 1.0
preds = [p['prediction'] for p in self.predictions_buffer]
old_preds = preds[:100]
new_preds = preds[-100:]
if method == 'ks_test':
ks_stat, p_value = stats.ks_2samp(old_preds, new_preds)
return p_value < alpha, p_value
return False, 1.0
def check_all_alerts(self):
alerts = []
# 1. Performance degradation
perf = self.calculate_performance_metrics()
if perf and perf['degradation'] < -self.thresholds.get('max_accuracy_drop', 0.05):
alerts.append({
'type': 'performance_degradation',
'severity': 'HIGH',
'detail': f"Accuracy dropped {perf['degradation']:.3f} from baseline"
})
# 2. Feature drift
recent_features = list(self.features_buffer)[-100:]
if recent_features and self.baseline.get('feature_distributions'):
for feature_name in self.baseline['feature_distributions']:
current_vals = [f.get(feature_name) for f in recent_features if f.get(feature_name) is not None]
if current_vals:
psi = self.calculate_psi(
self.baseline['feature_distributions'][feature_name],
current_vals
)
if psi > 0.25:
alerts.append({
'type': 'feature_drift',
'severity': 'MEDIUM',
'feature': feature_name,
'psi': psi
})
# 3. Concept drift
drifted, p_val = self.detect_concept_drift()
if drifted:
alerts.append({
'type': 'concept_drift',
'severity': 'MEDIUM',
'p_value': p_val
})
return alerts
Grafana Dashboard
Ключевые panels:
-
Rolling Accuracy Chart: 100-точечное скользящее среднее directional accuracy. Baseline line для сравнения.
-
PSI Heatmap: матрица PSI по features × времени. Цвет: зелёный (< 0.1), жёлтый (0.1–0.2), красный (> 0.2).
-
Confidence Distribution: гистограмма confidence scores. Смещение в сторону 0.5 (неуверенность) — признак деградации.
-
Alert Timeline: лента алертов с severity и типом.
Алерт система
ALERT_CHANNELS = {
'HIGH': ['telegram', 'email', 'pagerduty'],
'MEDIUM': ['telegram', 'email'],
'LOW': ['telegram']
}
async def send_degradation_alert(alert, model_id):
message = f"""
⚠️ ML Model Degradation Alert
Model: {model_id}
Type: {alert['type']}
Severity: {alert['severity']}
Detail: {alert.get('detail', '')}
Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}
Action recommended: Check model retraining system
"""
channels = ALERT_CHANNELS.get(alert['severity'], ['telegram'])
for channel in channels:
await send_notification(channel, message)
Разрабатываем систему мониторинга деградации с PSI feature drift detection, performance tracking, concept drift testing, Grafana dashboard и многоуровневой alert системой.







