Разработка системы автоматического переобучения моделей

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка системы автоматического переобучения моделей
Сложный
~1-2 недели
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка системы автоматического переобучения моделей

ML модели для трейдинга деградируют со временем: рыночные режимы меняются, отношения между переменными сдвигаются. Система автоматического переобучения детектирует деградацию и запускает новое обучение без ручного вмешательства.

Триггеры переобучения

Performance-based trigger: директional accuracy модели упала ниже threshold за rolling window:

class RetrainingTrigger:
    def __init__(self, performance_threshold=0.52, window_days=14, 
                 min_predictions=100):
        self.threshold = performance_threshold
        self.window = window_days
        self.min_predictions = min_predictions
    
    def should_retrain(self, recent_predictions, recent_actuals):
        if len(recent_predictions) < self.min_predictions:
            return False, 'insufficient_data'
        
        accuracy = np.mean(
            np.sign(recent_predictions) == np.sign(recent_actuals)
        )
        
        if accuracy < self.threshold:
            return True, f'accuracy_{accuracy:.3f}_below_{self.threshold}'
        
        return False, 'performance_ok'
    
    def check_data_drift(self, train_features, current_features):
        """Population Stability Index (PSI) для feature drift"""
        psi_values = {}
        
        for col in train_features.columns:
            # Делим на 10 бинов по обучающим данным
            bins = np.percentile(train_features[col].dropna(), 
                                np.linspace(0, 100, 11))
            bins[0] -= 1e-8
            
            train_counts = np.histogram(train_features[col], bins=bins)[0]
            current_counts = np.histogram(current_features[col], bins=bins)[0]
            
            # PSI
            train_pct = train_counts / train_counts.sum()
            current_pct = current_counts / current_counts.sum()
            
            # Avoid log(0)
            train_pct = np.clip(train_pct, 1e-8, None)
            current_pct = np.clip(current_pct, 1e-8, None)
            
            psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
            psi_values[col] = psi
        
        # PSI > 0.2 = значительный drift
        max_psi = max(psi_values.values())
        n_drifted = sum(1 for v in psi_values.values() if v > 0.2)
        
        return {
            'max_psi': max_psi,
            'n_drifted_features': n_drifted,
            'should_retrain': max_psi > 0.25 or n_drifted > 3,
            'psi_by_feature': psi_values
        }
    
    def check_scheduled(self, last_training_date, retrain_frequency_days=7):
        """Плановое переобучение по расписанию"""
        days_since_training = (datetime.utcnow() - last_training_date).days
        return days_since_training >= retrain_frequency_days

Автоматический training pipeline

import mlflow
from prefect import flow, task

@task
def fetch_training_data(symbol, lookback_days=365):
    """Загружаем данные для переобучения"""
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=lookback_days)
    # Загружаем из ClickHouse/PostgreSQL
    return load_ohlcv_data(symbol, start_date, end_date)

@task
def prepare_features(raw_data):
    """Feature engineering"""
    from feature_pipeline import FeatureEngineer
    engineer = FeatureEngineer()
    return engineer.create_all_features(raw_data)

@task
def train_and_evaluate(features_df, target_col, model_config):
    """Обучение модели с walk-forward validation"""
    from training import WalkForwardTrainer
    
    trainer = WalkForwardTrainer(
        n_splits=5,
        test_size=60,  # 60 дней тестовой выборки
        gap=24  # gap между train и test (часы)
    )
    
    with mlflow.start_run():
        model, metrics = trainer.fit_evaluate(features_df, target_col, model_config)
        
        # Логируем метрики в MLflow
        mlflow.log_metrics(metrics)
        mlflow.log_params(model_config)
        mlflow.sklearn.log_model(model, 'model')
        
        run_id = mlflow.active_run().info.run_id
    
    return model, metrics, run_id

@task
def validate_and_promote(model, metrics, run_id, min_metrics):
    """Проверяем качество и решаем о деплое"""
    passes_validation = (
        metrics.get('directional_accuracy', 0) >= min_metrics['accuracy'] and
        metrics.get('sharpe_ratio', 0) >= min_metrics['sharpe'] and
        metrics.get('max_drawdown', 1) <= min_metrics['max_drawdown']
    )
    
    if passes_validation:
        # Регистрируем как новую Production версию
        client = mlflow.tracking.MlflowClient()
        model_version = client.create_model_version(
            name='crypto_predictor',
            source=f'runs:/{run_id}/model',
            run_id=run_id
        )
        client.transition_model_version_stage(
            'crypto_predictor', model_version.version, 'Production'
        )
        return True, model_version.version
    
    return False, None

@flow(name="model_retraining_pipeline")
def retrain_model_pipeline(symbol, model_config, min_metrics):
    raw_data = fetch_training_data(symbol)
    features_df = prepare_features(raw_data)
    model, metrics, run_id = train_and_evaluate(features_df, 'target', model_config)
    promoted, version = validate_and_promote(model, metrics, run_id, min_metrics)
    
    return {'promoted': promoted, 'version': version, 'metrics': metrics}

Zero-downtime модель обновление

При успешном обучении новой модели нужно заменить старую без остановки торговли:

class ModelHotSwapper:
    def __init__(self):
        self.current_model = None
        self.model_version = None
        self._lock = asyncio.Lock()
    
    async def swap_model(self, new_model, new_version):
        """Thread-safe замена модели"""
        async with self._lock:
            old_model = self.current_model
            old_version = self.model_version
            
            self.current_model = new_model
            self.model_version = new_version
            
            # Логируем смену модели
            logger.info(f"Model swapped: {old_version} -> {new_version}")
            
            # Старую модель можно выгрузить из памяти
            del old_model
    
    async def predict(self, features):
        async with self._lock:
            return self.current_model.predict(features)

Расписание переобучения

Prefect или Airflow для оркестрации:

Ежедневно в 00:00 UTC:
    1. Проверка performance trigger
    2. Проверка PSI drift trigger
    3. Проверка schedule trigger (если > 7 дней с последнего обучения)
    → Если хотя бы один trigger сработал → запуск retraining pipeline
    → При успешном обучении → hot swap модели
    → Уведомление в Telegram: "Модель обновлена: v15 → v16, accuracy 0.567"

Разрабатываем систему авто-переобучения с PSI drift detection, performance monitoring trigger, Prefect/Airflow оркестрацией, MLflow tracking и zero-downtime hot swap.