Обучение модели детекции аномалий
Детекция аномалий — область, где выбор модели и стратегии обучения определяет практическую пригодность системы. Supervised, semi-supervised и unsupervised подходы имеют принципиально разные требования к данным и области применения.
Выбор стратегии обучения
Принятие решения:
| Доступные данные | Рекомендуемый подход |
|---|---|
| Размеченные аномалии (< 1%) | Imbalanced supervised или Cost-sensitive |
| Только нормальные данные | One-class SVM / Autoencoder / SVDD |
| Нет разметки вообще | Unsupervised: Isolation Forest, LOF |
| Временные ряды с сезонностью | STL + residual detection |
| Последовательности событий | LSTM Autoencoder |
Аксиома реального мира: Истинные аномалии редки (< 0.1-1%). Модель, предсказывающая "нормально" всегда, даёт 99.9% accuracy. Precision/Recall/F1 — правильные метрики, не accuracy.
Supervised обучение при несбалансированных данных
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, average_precision_score
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTETomek
def train_with_imbalance(X_train, y_train):
"""
Несколько стратегий работы с class imbalance
"""
# Стратегия 1: class_weight='balanced'
rf_balanced = RandomForestClassifier(
n_estimators=300,
class_weight='balanced',
random_state=42
)
# Стратегия 2: SMOTE oversampling (только для tabular data)
smote = SMOTETomek(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
rf_smote = RandomForestClassifier(n_estimators=300)
# Стратегия 3: Cost-sensitive LightGBM
import lightgbm as lgb
n_pos = y_train.sum()
n_neg = len(y_train) - n_pos
scale_pos_weight = n_neg / n_pos # вес аномалий
lgbm_cost = lgb.LGBMClassifier(
scale_pos_weight=scale_pos_weight,
n_estimators=300
)
return rf_balanced, (rf_smote, X_resampled, y_resampled), lgbm_cost
One-Class Classification (только нормальные данные)
One-Class SVM:
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
def train_one_class_svm(normal_data, contamination=0.01):
"""
nu = верхняя граница доли false positives на обучающих данных
= нижняя граница support vectors
"""
scaler = StandardScaler()
X_scaled = scaler.fit_transform(normal_data)
ocsvm = OneClassSVM(
kernel='rbf',
nu=contamination, # 1-5% ожидаемых аномалий
gamma='scale'
)
ocsvm.fit(X_scaled)
return ocsvm, scaler
def detect_anomalies_ocsvm(model, scaler, new_data):
X_scaled = scaler.transform(new_data)
predictions = model.predict(X_scaled) # +1 = нормально, -1 = аномалия
scores = model.score_samples(X_scaled) # чем ниже, тем аномальнее
return predictions == -1, scores
Deep SVDD (Deep Support Vector Data Description):
import torch
import torch.nn as nn
class DeepSVDD(nn.Module):
"""
Neural network версия One-Class SVM
Обучается компактно отображать нормальные данные в гиперсферу
Аномалии = далеко от центра сферы
"""
def __init__(self, input_dim, latent_dim=64):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, latent_dim)
)
self.center = None
def init_center(self, dataloader, eps=0.1):
"""Инициализация центра гиперсферы"""
embeddings = []
with torch.no_grad():
for x, in dataloader:
embeddings.append(self.encoder(x))
center = torch.mean(torch.cat(embeddings), dim=0)
# Избегаем коллапса к нулю
center[(abs(center) < eps) & (center < 0)] = -eps
center[(abs(center) < eps) & (center >= 0)] = eps
self.center = center
def forward(self, x):
z = self.encoder(x)
return torch.sum((z - self.center)**2, dim=1) # расстояние до центра
Autoencoder подход
Reconstruction Error как мера аномальности:
class AnomalyAutoencoder(nn.Module):
def __init__(self, input_dim, latent_dim=32):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, latent_dim)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.BatchNorm1d(64),
nn.Linear(64, 128),
nn.ReLU(),
nn.Linear(128, input_dim)
)
def forward(self, x):
z = self.encoder(x)
x_reconstructed = self.decoder(z)
return x_reconstructed
def anomaly_score(self, x):
with torch.no_grad():
x_rec = self(x)
re = torch.mean((x - x_rec)**2, dim=1)
return re
# Порог: например 95-й перцентиль reconstruction error на validation set
Оценка и валидация
Правильные метрики:
from sklearn.metrics import (
precision_recall_curve,
average_precision_score,
roc_auc_score,
confusion_matrix
)
def evaluate_anomaly_detector(y_true, y_scores, threshold=None):
"""
Для несбалансированного класса: AUC-PR важнее AUC-ROC
"""
# AUCPR (Area Under Precision-Recall curve)
aucpr = average_precision_score(y_true, y_scores)
# AUC-ROC
aucroc = roc_auc_score(y_true, y_scores)
# При конкретном пороге
if threshold is not None:
y_pred = (y_scores >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
return {
'aucpr': aucpr,
'aucroc': aucroc,
'precision': tp / (tp + fp + 1e-10),
'recall': tp / (tp + fn + 1e-10),
'f1': 2*tp / (2*tp + fp + fn + 1e-10),
'false_positive_rate': fp / (fp + tn + 1e-10)
}
return {'aucpr': aucpr, 'aucroc': aucroc}
Threshold Selection:
def find_optimal_threshold(y_true, y_scores, beta=1.0):
"""
F_beta score: beta > 1 приоритизирует recall (не пропустить аномалию)
beta < 1 приоритизирует precision (меньше ложных тревог)
"""
from sklearn.metrics import fbeta_score
thresholds = np.linspace(0, 1, 200)
scores = [fbeta_score(y_true, y_scores >= t, beta=beta)
for t in thresholds]
optimal_threshold = thresholds[np.argmax(scores)]
return optimal_threshold
Continuous Learning Pipeline
Feedback loop:
class AnomalyModelPipeline:
def __init__(self, model, feedback_buffer_size=1000):
self.model = model
self.feedback = [] # накопленная обратная связь
def predict(self, features):
score = self.model.anomaly_score(features)
return score
def receive_feedback(self, prediction_id, is_true_positive):
"""
Инженер помечает алерт: true positive / false positive
"""
self.feedback.append({
'prediction_id': prediction_id,
'label': 1 if is_true_positive else 0,
'timestamp': datetime.now()
})
if len(self.feedback) >= self.feedback_buffer_size:
self.retrain()
def retrain(self):
"""
Периодическое дообучение на накопленных метках
"""
X_feedback = retrieve_features(self.feedback)
y_feedback = [f['label'] for f in self.feedback]
self.model.partial_fit(X_feedback, y_feedback)
self.feedback = []
mlflow.log_metric('retrain_samples', len(X_feedback))
MLflow для tracking:
- Параметры: threshold, contamination, latent_dim
- Метрики: AUCPR, false positive rate, detection rate
- Artifacts: модель, ROC/PR curves
Сроки: baseline Isolation Forest + одна supervised модель + evaluation metrics — 2-3 недели. Autoencoder, One-Class SVM, feedback loop, MLflow tracking, production deployment — 6-8 недель.







