Разработка системы мониторинга деградации ML-модели

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка системы мониторинга деградации ML-модели
Средний
~5 дней
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка системы мониторинга деградации ML-модели

ML модели для торговли деградируют по нескольким причинам: рыночные режимы меняются, арбитражируются паттерны которые модель использовала, изменяются корреляции между активами. Мониторинг деградации позволяет обнаружить проблему до того, как она нанесёт финансовый ущерб.

Типы деградации

Concept drift: изменились отношения между features и target. Модель предсказывает правильно для «старого» рынка.

Data drift (feature drift): входные данные стали распределяться иначе, чем при обучении. Модель получает нетипичные inputs.

Label drift: само распределение target переменной изменилось (например, рынок из трендового стал боковым).

Performance degradation: качество предсказаний снизилось даже без явного drift.

Метрики для мониторинга

import numpy as np
import pandas as pd
from scipy import stats
from collections import deque

class ModelDegradationMonitor:
    def __init__(self, model_id, baseline_metrics, alert_thresholds):
        self.model_id = model_id
        self.baseline = baseline_metrics
        self.thresholds = alert_thresholds
        
        # Rolling windows для метрик
        self.predictions_buffer = deque(maxlen=500)
        self.actuals_buffer = deque(maxlen=500)
        self.features_buffer = deque(maxlen=1000)
    
    def log_prediction(self, features, prediction, confidence):
        self.predictions_buffer.append({
            'prediction': prediction,
            'confidence': confidence,
            'timestamp': datetime.utcnow()
        })
        self.features_buffer.append(features)
    
    def log_actual(self, actual_return):
        self.actuals_buffer.append(actual_return)
    
    def calculate_performance_metrics(self, window=100):
        if len(self.predictions_buffer) < window:
            return None
        
        recent_preds = [p['prediction'] for p in list(self.predictions_buffer)[-window:]]
        recent_actuals = list(self.actuals_buffer)[-window:]
        
        if len(recent_actuals) < window:
            return None
        
        # Directional accuracy
        dir_accuracy = np.mean(
            np.sign(recent_preds) == np.sign(recent_actuals)
        )
        
        # Confidence calibration: при высокой уверенности должна быть высокая точность
        high_conf_preds = [
            (p['prediction'], a) 
            for p, a in zip(list(self.predictions_buffer)[-window:], recent_actuals)
            if p['confidence'] > 0.65
        ]
        
        if high_conf_preds:
            high_conf_accuracy = np.mean([
                np.sign(pred) == np.sign(actual) 
                for pred, actual in high_conf_preds
            ])
        else:
            high_conf_accuracy = None
        
        return {
            'directional_accuracy': dir_accuracy,
            'high_conf_accuracy': high_conf_accuracy,
            'degradation': dir_accuracy - self.baseline.get('directional_accuracy', 0.55),
            'n_predictions': window
        }
    
    def calculate_psi(self, train_distribution, current_values, n_bins=10):
        """Population Stability Index для feature drift"""
        bins = np.percentile(train_distribution, np.linspace(0, 100, n_bins + 1))
        bins[0] -= 1e-8
        
        train_pct = np.ones(n_bins) / n_bins  # равномерное по квантилям
        current_hist = np.histogram(current_values, bins=bins)[0]
        current_pct = np.clip(current_hist / current_hist.sum(), 1e-8, None)
        
        psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
        return psi
    
    def detect_concept_drift(self, method='ks_test', alpha=0.05):
        """KS-test для сравнения распределений недавних и исторических предсказаний"""
        if len(self.predictions_buffer) < 200:
            return False, 1.0
        
        preds = [p['prediction'] for p in self.predictions_buffer]
        old_preds = preds[:100]
        new_preds = preds[-100:]
        
        if method == 'ks_test':
            ks_stat, p_value = stats.ks_2samp(old_preds, new_preds)
            return p_value < alpha, p_value
        
        return False, 1.0
    
    def check_all_alerts(self):
        alerts = []
        
        # 1. Performance degradation
        perf = self.calculate_performance_metrics()
        if perf and perf['degradation'] < -self.thresholds.get('max_accuracy_drop', 0.05):
            alerts.append({
                'type': 'performance_degradation',
                'severity': 'HIGH',
                'detail': f"Accuracy dropped {perf['degradation']:.3f} from baseline"
            })
        
        # 2. Feature drift
        recent_features = list(self.features_buffer)[-100:]
        if recent_features and self.baseline.get('feature_distributions'):
            for feature_name in self.baseline['feature_distributions']:
                current_vals = [f.get(feature_name) for f in recent_features if f.get(feature_name) is not None]
                if current_vals:
                    psi = self.calculate_psi(
                        self.baseline['feature_distributions'][feature_name],
                        current_vals
                    )
                    if psi > 0.25:
                        alerts.append({
                            'type': 'feature_drift',
                            'severity': 'MEDIUM',
                            'feature': feature_name,
                            'psi': psi
                        })
        
        # 3. Concept drift
        drifted, p_val = self.detect_concept_drift()
        if drifted:
            alerts.append({
                'type': 'concept_drift',
                'severity': 'MEDIUM',
                'p_value': p_val
            })
        
        return alerts

Grafana Dashboard

Ключевые panels:

  • Rolling Accuracy Chart: 100-точечное скользящее среднее directional accuracy. Baseline line для сравнения.

  • PSI Heatmap: матрица PSI по features × времени. Цвет: зелёный (< 0.1), жёлтый (0.1–0.2), красный (> 0.2).

  • Confidence Distribution: гистограмма confidence scores. Смещение в сторону 0.5 (неуверенность) — признак деградации.

  • Alert Timeline: лента алертов с severity и типом.

Алерт система

ALERT_CHANNELS = {
    'HIGH': ['telegram', 'email', 'pagerduty'],
    'MEDIUM': ['telegram', 'email'],
    'LOW': ['telegram']
}

async def send_degradation_alert(alert, model_id):
    message = f"""
⚠️ ML Model Degradation Alert
Model: {model_id}
Type: {alert['type']}
Severity: {alert['severity']}
Detail: {alert.get('detail', '')}
Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}

Action recommended: Check model retraining system
"""
    channels = ALERT_CHANNELS.get(alert['severity'], ['telegram'])
    for channel in channels:
        await send_notification(channel, message)

Разрабатываем систему мониторинга деградации с PSI feature drift detection, performance tracking, concept drift testing, Grafana dashboard и многоуровневой alert системой.