Реализация детекции аномалий во временных рядах
Детекция аномалий во временных рядах — задача с множеством подходов и ни одним универсальным решением. Выбор алгоритма определяется типом аномалии (точечная, контекстуальная, коллективная), доступностью меток и вычислительными ограничениями. Практическая система объединяет несколько методов для минимизации пропусков и ложных срабатываний.
Типология аномалий
Точечные аномалии (Point Anomalies): Единичное значение резко выбивается из ряда. Пример: прочтение температурного датчика 200°C при норме 50°C.
Контекстуальные аномалии: Значение нормально само по себе, но аномально в данном контексте. Пример: температура 35°C в январе (норма летом, аномалия зимой).
Коллективные аномалии: Последовательность значений нормальна по отдельности, но аномальна вместе. Пример: несколько стандартных транзакций, образующих паттерн мошенничества.
Статистические методы
Z-Score и MAD:
import numpy as np
from scipy.stats import median_abs_deviation
def zscore_anomalies(series, threshold=3.0):
"""
Z-score: хорошо для нормально распределённых данных
"""
z_scores = np.abs((series - series.mean()) / series.std())
return z_scores > threshold
def mad_anomalies(series, threshold=3.5):
"""
MAD (Median Absolute Deviation): устойчив к выбросам в обучающих данных
Предпочтительнее z-score для данных с артефактами
"""
median = np.median(series)
mad = median_abs_deviation(series)
modified_z = 0.6745 * (series - median) / mad
return np.abs(modified_z) > threshold
CUSUM для постепенных изменений:
def cusum_detector(series, k=0.5, h=5.0):
"""
CUSUM: накапливает отклонения → детектирует shift в среднем
k: reference value (чувствительность)
h: threshold (порог срабатывания)
"""
mean = series[:50].mean() # baseline на начале ряда
std = series[:50].std()
S_pos = np.zeros(len(series))
S_neg = np.zeros(len(series))
for t in range(1, len(series)):
xi = (series[t] - mean) / std
S_pos[t] = max(0, S_pos[t-1] + xi - k)
S_neg[t] = max(0, S_neg[t-1] - xi - k)
return (S_pos > h) | (S_neg > h)
STL-декомпозиция + residual detection:
from statsmodels.tsa.seasonal import STL
def stl_anomaly_detection(series, period=24, threshold=3.5):
"""
Разложение на тренд + сезонность + остаток
Аномалия = большой остаток
"""
stl = STL(series, period=period, robust=True)
result = stl.fit()
residuals = result.resid
# MAD на остатках
mad = median_abs_deviation(residuals)
modified_z = np.abs(0.6745 * (residuals - np.median(residuals)) / mad)
return modified_z > threshold, result
ML-методы
Isolation Forest:
from sklearn.ensemble import IsolationForest
def isolation_forest_detector(series, contamination=0.05, window=10):
"""
Isolation Forest: эффективен на многомерных данных
contamination: ожидаемая доля аномалий
window: размер скользящего окна для создания фич
"""
# Создание оконных фич
features = []
for i in range(window, len(series)):
window_data = series[i-window:i]
features.append([
window_data.mean(),
window_data.std(),
window_data.max() - window_data.min(),
window_data[-1] - window_data.mean(), # current deviation
np.corrcoef(np.arange(window), window_data)[0,1] # trend
])
features = np.array(features)
iso_forest = IsolationForest(contamination=contamination, random_state=42)
predictions = iso_forest.fit_predict(features)
# -1 = аномалия, 1 = нормально
return predictions == -1
LSTM Autoencoder:
import torch
import torch.nn as nn
class LSTMAutoencoder(nn.Module):
def __init__(self, input_size, hidden_size=64, num_layers=2):
super().__init__()
# Encoder
self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# Decoder
self.decoder = nn.LSTM(hidden_size, input_size, num_layers, batch_first=True)
def forward(self, x):
# Encode
_, (h_n, c_n) = self.encoder(x)
# Decode: repeat hidden state for decoder input
decoder_input = h_n[-1].unsqueeze(1).repeat(1, x.size(1), 1)
reconstruction, _ = self.decoder(decoder_input)
return reconstruction
def detect_autoencoder_anomalies(model, series, threshold_quantile=0.95):
"""
Reconstruction error как мера аномальности
Высокий RE = модель не может восстановить паттерн = аномалия
"""
with torch.no_grad():
reconstruction = model(series)
re = torch.mean((series - reconstruction)**2, dim=[1, 2])
threshold = torch.quantile(re, threshold_quantile)
return re > threshold
Online (потоковая) детекция
Streaming Anomaly Detection:
from collections import deque
import numpy as np
class OnlineAnomalyDetector:
"""
Полностью онлайн: работает без накопления истории в памяти
Обновляет статистику при каждой новой точке
"""
def __init__(self, window_size=200, threshold=3.5):
self.window = deque(maxlen=window_size)
self.threshold = threshold
self.n = 0
self.mean = 0
self.M2 = 0 # Welford's algorithm для online variance
def update(self, value):
self.window.append(value)
self.n += 1
# Welford's online mean and variance
delta = value - self.mean
self.mean += delta / self.n
delta2 = value - self.mean
self.M2 += delta * delta2
variance = self.M2 / (self.n - 1) if self.n > 1 else 0
std = np.sqrt(variance)
if std > 0 and self.n > 30: # warmup period
z_score = abs(value - self.mean) / std
return z_score > self.threshold
return False
Оценка качества детектора
Метрики при наличии меток:
from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score
def evaluate_detector(y_true, y_pred, y_scores=None):
metrics = {
'precision': precision_score(y_true, y_pred),
'recall': recall_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred),
}
if y_scores is not None:
metrics['average_precision'] = average_precision_score(y_true, y_scores)
return metrics
Без меток — относительная оценка:
- False Positive Rate: доля времени, когда система в "аномалии" при нормальном режиме
- Alert fatigue: если > 5% времени генерируются алерты — система слишком чувствительна
- Operational feedback: инженеры помечают алерты как true/false positive → continuously improved model
Практические сценарии
Метрики инфраструктуры: Prometheus метрики → STL декомпозиция + Isolation Forest. Основная проблема: деплои создают ложные аномалии. Решение: suppress detection window ±10 минут от деплоя.
Финансовые транзакции: Высокое class imbalance (аномалии < 0.1%). LSTM Autoencoder или Isolation Forest лучше supervised методов.
Промышленные датчики: Часто имеют физически обусловленные ограничения → threshold + statistical hybrid.
Сроки: STL + Isolation Forest + online Z-score + базовый дашборд — 3-4 недели. LSTM Autoencoder, streaming detection, feedback loop для переобучения, multi-sensor fusion — 2-3 месяца.







