Обучение модели детекции аномалий

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
Обучение модели детекции аномалий
Средняя
~5 рабочих дней
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    853
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

Обучение модели детекции аномалий

Детекция аномалий — область, где выбор модели и стратегии обучения определяет практическую пригодность системы. Supervised, semi-supervised и unsupervised подходы имеют принципиально разные требования к данным и области применения.

Выбор стратегии обучения

Принятие решения:

Доступные данные Рекомендуемый подход
Размеченные аномалии (< 1%) Imbalanced supervised или Cost-sensitive
Только нормальные данные One-class SVM / Autoencoder / SVDD
Нет разметки вообще Unsupervised: Isolation Forest, LOF
Временные ряды с сезонностью STL + residual detection
Последовательности событий LSTM Autoencoder

Аксиома реального мира: Истинные аномалии редки (< 0.1-1%). Модель, предсказывающая "нормально" всегда, даёт 99.9% accuracy. Precision/Recall/F1 — правильные метрики, не accuracy.

Supervised обучение при несбалансированных данных

import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, average_precision_score
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTETomek

def train_with_imbalance(X_train, y_train):
    """
    Несколько стратегий работы с class imbalance
    """
    # Стратегия 1: class_weight='balanced'
    rf_balanced = RandomForestClassifier(
        n_estimators=300,
        class_weight='balanced',
        random_state=42
    )

    # Стратегия 2: SMOTE oversampling (только для tabular data)
    smote = SMOTETomek(random_state=42)
    X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
    rf_smote = RandomForestClassifier(n_estimators=300)

    # Стратегия 3: Cost-sensitive LightGBM
    import lightgbm as lgb
    n_pos = y_train.sum()
    n_neg = len(y_train) - n_pos
    scale_pos_weight = n_neg / n_pos  # вес аномалий

    lgbm_cost = lgb.LGBMClassifier(
        scale_pos_weight=scale_pos_weight,
        n_estimators=300
    )

    return rf_balanced, (rf_smote, X_resampled, y_resampled), lgbm_cost

One-Class Classification (только нормальные данные)

One-Class SVM:

from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

def train_one_class_svm(normal_data, contamination=0.01):
    """
    nu = верхняя граница доли false positives на обучающих данных
    = нижняя граница support vectors
    """
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(normal_data)

    ocsvm = OneClassSVM(
        kernel='rbf',
        nu=contamination,  # 1-5% ожидаемых аномалий
        gamma='scale'
    )
    ocsvm.fit(X_scaled)

    return ocsvm, scaler

def detect_anomalies_ocsvm(model, scaler, new_data):
    X_scaled = scaler.transform(new_data)
    predictions = model.predict(X_scaled)  # +1 = нормально, -1 = аномалия
    scores = model.score_samples(X_scaled)  # чем ниже, тем аномальнее
    return predictions == -1, scores

Deep SVDD (Deep Support Vector Data Description):

import torch
import torch.nn as nn

class DeepSVDD(nn.Module):
    """
    Neural network версия One-Class SVM
    Обучается компактно отображать нормальные данные в гиперсферу
    Аномалии = далеко от центра сферы
    """
    def __init__(self, input_dim, latent_dim=64):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim)
        )
        self.center = None

    def init_center(self, dataloader, eps=0.1):
        """Инициализация центра гиперсферы"""
        embeddings = []
        with torch.no_grad():
            for x, in dataloader:
                embeddings.append(self.encoder(x))
        center = torch.mean(torch.cat(embeddings), dim=0)
        # Избегаем коллапса к нулю
        center[(abs(center) < eps) & (center < 0)] = -eps
        center[(abs(center) < eps) & (center >= 0)] = eps
        self.center = center

    def forward(self, x):
        z = self.encoder(x)
        return torch.sum((z - self.center)**2, dim=1)  # расстояние до центра

Autoencoder подход

Reconstruction Error как мера аномальности:

class AnomalyAutoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim=32):
        super().__init__()

        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed

    def anomaly_score(self, x):
        with torch.no_grad():
            x_rec = self(x)
            re = torch.mean((x - x_rec)**2, dim=1)
        return re

# Порог: например 95-й перцентиль reconstruction error на validation set

Оценка и валидация

Правильные метрики:

from sklearn.metrics import (
    precision_recall_curve,
    average_precision_score,
    roc_auc_score,
    confusion_matrix
)

def evaluate_anomaly_detector(y_true, y_scores, threshold=None):
    """
    Для несбалансированного класса: AUC-PR важнее AUC-ROC
    """
    # AUCPR (Area Under Precision-Recall curve)
    aucpr = average_precision_score(y_true, y_scores)

    # AUC-ROC
    aucroc = roc_auc_score(y_true, y_scores)

    # При конкретном пороге
    if threshold is not None:
        y_pred = (y_scores >= threshold).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

        return {
            'aucpr': aucpr,
            'aucroc': aucroc,
            'precision': tp / (tp + fp + 1e-10),
            'recall': tp / (tp + fn + 1e-10),
            'f1': 2*tp / (2*tp + fp + fn + 1e-10),
            'false_positive_rate': fp / (fp + tn + 1e-10)
        }

    return {'aucpr': aucpr, 'aucroc': aucroc}

Threshold Selection:

def find_optimal_threshold(y_true, y_scores, beta=1.0):
    """
    F_beta score: beta > 1 приоритизирует recall (не пропустить аномалию)
    beta < 1 приоритизирует precision (меньше ложных тревог)
    """
    from sklearn.metrics import fbeta_score
    thresholds = np.linspace(0, 1, 200)
    scores = [fbeta_score(y_true, y_scores >= t, beta=beta)
              for t in thresholds]
    optimal_threshold = thresholds[np.argmax(scores)]
    return optimal_threshold

Continuous Learning Pipeline

Feedback loop:

class AnomalyModelPipeline:
    def __init__(self, model, feedback_buffer_size=1000):
        self.model = model
        self.feedback = []  # накопленная обратная связь

    def predict(self, features):
        score = self.model.anomaly_score(features)
        return score

    def receive_feedback(self, prediction_id, is_true_positive):
        """
        Инженер помечает алерт: true positive / false positive
        """
        self.feedback.append({
            'prediction_id': prediction_id,
            'label': 1 if is_true_positive else 0,
            'timestamp': datetime.now()
        })

        if len(self.feedback) >= self.feedback_buffer_size:
            self.retrain()

    def retrain(self):
        """
        Периодическое дообучение на накопленных метках
        """
        X_feedback = retrieve_features(self.feedback)
        y_feedback = [f['label'] for f in self.feedback]
        self.model.partial_fit(X_feedback, y_feedback)
        self.feedback = []
        mlflow.log_metric('retrain_samples', len(X_feedback))

MLflow для tracking:

  • Параметры: threshold, contamination, latent_dim
  • Метрики: AUCPR, false positive rate, detection rate
  • Artifacts: модель, ROC/PR curves

Сроки: baseline Isolation Forest + одна supervised модель + evaluation metrics — 2-3 недели. Autoencoder, One-Class SVM, feedback loop, MLflow tracking, production deployment — 6-8 недель.