Разработка MLOps-инфраструктуры для торговых моделей

Проектируем и разрабатываем блокчейн-решения полного цикла: от архитектуры смарт-контрактов до запуска DeFi-протоколов, NFT-маркетплейсов и криптобирж. Аудит безопасности, токеномика, интеграция с существующей инфраструктурой.
Показано 1 из 1Все 1306 услуг
Разработка MLOps-инфраструктуры для торговых моделей
Сложный
от 2 недель до 3 месяцев
Часто задаваемые вопросы

Направления блокчейн-разработки

Этапы блокчейн-разработки

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1288
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1198
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    902
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1122
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    589
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    859

Разработка MLOps-инфраструктуры для торговых моделей

MLOps — это DevOps для машинного обучения. Для торговых моделей это особенно критично: задержка в деплое новой версии или падение inference сервера стоят реальных денег. MLOps инфраструктура обеспечивает надёжное, воспроизводимое и автоматизированное управление полным жизненным циклом ML модели.

Компоненты MLOps для трейдинга

Data Layer          │  ML Layer           │  Serving Layer       │  Monitoring
                    │                     │                       │
ClickHouse (ticks)  │  MLflow (tracking)  │  FastAPI (inference)  │  Prometheus
PostgreSQL (trade)  │  DVC (data version) │  Redis (cache)        │  Grafana
S3/MinIO (raw data) │  Prefect (pipeline) │  Docker/K8s           │  Alertmanager
Feature Store       │  Optuna (HPO)       │  Load Balancer        │  PagerDuty

Experiment tracking с MLflow

import mlflow
import mlflow.sklearn
import mlflow.pytorch
from mlflow.models.signature import infer_signature

def train_with_mlflow_tracking(experiment_name, config, X_train, y_train, 
                                X_val, y_val, X_test, y_test):
    mlflow.set_experiment(experiment_name)
    
    with mlflow.start_run(run_name=f"{config['model_type']}_{config['version']}"):
        # Логируем параметры
        mlflow.log_params({
            'model_type': config['model_type'],
            'n_features': X_train.shape[1],
            'train_size': len(X_train),
            'val_size': len(X_val),
            **config.get('hyperparams', {})
        })
        
        # Обучение
        model = train_model(config, X_train, y_train, X_val, y_val)
        
        # Метрики
        val_metrics = evaluate_model(model, X_val, y_val)
        test_metrics = evaluate_model(model, X_test, y_test)
        
        mlflow.log_metrics({
            f'val_{k}': v for k, v in val_metrics.items()
        })
        mlflow.log_metrics({
            f'test_{k}': v for k, v in test_metrics.items()
        })
        
        # Сохраняем модель с signature
        signature = infer_signature(X_train[:10], model.predict_proba(X_train[:10]))
        mlflow.sklearn.log_model(
            model,
            'model',
            signature=signature,
            registered_model_name=f"crypto_{config['symbol']}_predictor"
        )
        
        # Артефакты: confusion matrix, feature importance plot
        import matplotlib.pyplot as plt
        fig = plot_feature_importance(model, X_train.columns)
        mlflow.log_figure(fig, 'feature_importance.png')
        
        run_id = mlflow.active_run().info.run_id
    
    return run_id, test_metrics

Data versioning с DVC

# dvc.yaml — pipeline определение
stages:
  fetch_data:
    cmd: python src/data/fetch_ohlcv.py --symbol BTC --days 730
    deps:
      - src/data/fetch_ohlcv.py
    outs:
      - data/raw/btc_ohlcv.parquet

  feature_engineering:
    cmd: python src/features/engineer.py
    deps:
      - src/features/engineer.py
      - data/raw/btc_ohlcv.parquet
    outs:
      - data/features/btc_features.parquet
    params:
      - params.yaml:
          - feature_engineering

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/btc_features.parquet
    outs:
      - models/btc_predictor.pkl
    metrics:
      - metrics/train_metrics.json
    params:
      - params.yaml:
          - training

CI/CD для ML с GitHub Actions

# .github/workflows/ml_pipeline.yml
name: ML Training Pipeline

on:
  schedule:
    - cron: '0 1 * * 0'  # Каждое воскресенье в 01:00 UTC
  workflow_dispatch:
    inputs:
      symbol:
        description: 'Trading symbol'
        default: 'BTC'

jobs:
  train:
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v3
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install -r requirements.txt
      
      - name: Pull data with DVC
        run: dvc pull data/
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
      
      - name: Run training pipeline
        run: dvc repro
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
      
      - name: Validate model
        run: python src/validate_model.py --min-accuracy 0.54 --min-sharpe 1.0
      
      - name: Deploy to production
        if: success()
        run: python src/deploy_model.py
        env:
          TRADING_API_KEY: ${{ secrets.TRADING_API }}

Kubernetes deployment для inference

# k8s/ml-inference-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: crypto-ml-inference
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-inference
  template:
    spec:
      containers:
      - name: inference
        image: crypto-ml-inference:latest
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "2000m"
            memory: "4Gi"
        env:
        - name: MLFLOW_TRACKING_URI
          valueFrom:
            secretKeyRef:
              name: ml-secrets
              key: mlflow_uri
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 10

Prometheus метрики для ML

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Метрики inference сервера
prediction_counter = Counter('predictions_total', 'Total predictions', ['model', 'symbol'])
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency',
                               buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
model_accuracy_gauge = Gauge('model_directional_accuracy', 'Rolling accuracy', 
                              ['model', 'symbol', 'window'])
feature_drift_gauge = Gauge('feature_psi', 'PSI for feature drift', ['model', 'feature'])

@app.post("/predict")
async def predict(request: PredictionRequest):
    prediction_counter.labels(model=request.model_id, symbol=request.symbol).inc()
    
    with prediction_latency.time():
        result = await run_inference(request)
    
    return result

Feature Store с Feast

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64

# Определяем entities и features
crypto_entity = Entity(name='symbol', join_keys=['symbol'])

ohlcv_features = FeatureView(
    name='crypto_ohlcv_features',
    entities=[crypto_entity],
    ttl=timedelta(hours=2),
    schema=[
        Field(name='return_24h', dtype=Float32),
        Field(name='rsi_14', dtype=Float32),
        Field(name='bb_pos_20', dtype=Float32),
        Field(name='vol_ratio_24', dtype=Float32),
    ]
)

# Realtime serving
store = FeatureStore(repo_path='./feature_repo')

def get_realtime_features(symbol):
    features = store.get_online_features(
        features=['crypto_ohlcv_features:return_24h', 
                  'crypto_ohlcv_features:rsi_14'],
        entity_rows=[{'symbol': symbol}]
    )
    return features.to_dict()

Observability

Distributed tracing с OpenTelemetry: trace request от входящего рыночного данного до выходного торгового сигнала. Выявляем узкие места в pipeline.

Structured logging с structured JSON format, ELK stack для агрегации.

Cost monitoring: трекинг стоимости GPU compute на каждое обучение. Budget alerts.

Разрабатываем полноценную MLOps инфраструктуру: MLflow + DVC, CI/CD pipeline, Kubernetes deployment, Prometheus/Grafana мониторинг, Feast Feature Store и OpenTelemetry трасировка.