Разработка AI-системы обнаружения аномалий в работе оборудования
Обнаружение аномалий оборудования — задача, где ложные отрицательные (пропущенный отказ) обходятся дороже ложных положительных (лишняя проверка). Архитектура системы строится вокруг этого приоритета: несколько детекторов с разными методами, консенсус для высокой специфичности, ранний сигнал для критичных активов.
Многоуровневая архитектура детекции
Уровни обнаружения аномалий:
| Уровень | Метод | Задержка | Тип аномалии |
|---|---|---|---|
| L1: Threshold | Статические пороги по ISO/ГОСТ | мс | Грубые нарушения |
| L2: Statistical | EWMA, CUSUM, 3σ правила | с | Медленный дрейф |
| L3: ML Unsupervised | Isolation Forest, Autoencoder | мин | Многомерные паттерны |
| L4: Supervised | XGBoost на размеченных отказах | мин | Известные типы отказов |
| L5: Physics | Модель нормального поведения актива | ч | Отклонение от физической модели |
Комбинация уровней: L1/L2 — для немедленных алертов, L3/L4 — для ранней диагностики, L5 — для долгосрочного тренда.
Feature Engineering для оборудования
Временные и частотные признаки:
import numpy as np
from scipy import stats, signal
def extract_equipment_features(raw_signal, sampling_rate=1000):
"""
Многодоменные признаки из сырого сигнала датчика (вибрация/ток/давление)
"""
# Временная область
features = {
'rms': np.sqrt(np.mean(raw_signal**2)),
'peak': np.max(np.abs(raw_signal)),
'crest_factor': np.max(np.abs(raw_signal)) / np.sqrt(np.mean(raw_signal**2)),
'kurtosis': stats.kurtosis(raw_signal),
'skewness': stats.skew(raw_signal),
'peak_to_peak': np.ptp(raw_signal),
'shape_factor': np.sqrt(np.mean(raw_signal**2)) / np.mean(np.abs(raw_signal))
}
# Частотная область
freqs, psd = signal.welch(raw_signal, fs=sampling_rate, nperseg=512)
total_power = np.trapz(psd, freqs)
# Энергия в диапазонах (Гц)
bands = [(0, 100), (100, 500), (500, 2000), (2000, 5000)]
for low, high in bands:
mask = (freqs >= low) & (freqs < high)
band_power = np.trapz(psd[mask], freqs[mask])
features[f'band_power_{low}_{high}'] = band_power / total_power
# Доминирующая частота
features['dominant_freq'] = freqs[np.argmax(psd)]
features['spectral_centroid'] = np.sum(freqs * psd) / np.sum(psd)
return features
Дельта-признаки (изменения во времени):
def compute_delta_features(current_features, baseline_features, trend_features_7d):
"""
Важно не абсолютное значение, а отклонение от нормы конкретного актива
"""
deltas = {}
for key in current_features:
if key in baseline_features:
deltas[f'{key}_delta_abs'] = current_features[key] - baseline_features[key]
if baseline_features[key] != 0:
deltas[f'{key}_delta_pct'] = (
(current_features[key] - baseline_features[key]) /
abs(baseline_features[key]) * 100
)
# Тренд за 7 дней
if key in trend_features_7d:
deltas[f'{key}_trend_7d'] = trend_features_7d[key] # slope из linregress
return deltas
Unsupervised Anomaly Detection
Isolation Forest с адаптацией к сезонности:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd
class EquipmentAnomalyDetector:
def __init__(self, contamination=0.02):
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=200,
random_state=42
)
self.baseline_built = False
def fit_baseline(self, normal_operation_features: pd.DataFrame):
"""
Обучение на исторически нормальных данных конкретного актива
Минимум 30 дней нормальной работы
"""
X = self.scaler.fit_transform(normal_operation_features)
self.model.fit(X)
self.baseline_built = True
# Порог: 5-й перцентиль аномальных скоров на нормальных данных
scores = self.model.score_samples(X)
self.threshold = np.percentile(scores, 5)
def detect(self, current_features: dict) -> dict:
if not self.baseline_built:
return {'status': 'no_baseline', 'anomaly': False}
X = self.scaler.transform([list(current_features.values())])
score = self.model.score_samples(X)[0]
is_anomaly = score < self.threshold
return {
'anomaly_score': float(-score), # инвертируем: выше = аномальнее
'anomaly': bool(is_anomaly),
'severity': self._classify_severity(-score)
}
def _classify_severity(self, anomaly_score):
if anomaly_score > 0.8: return 'critical'
if anomaly_score > 0.6: return 'high'
if anomaly_score > 0.4: return 'medium'
return 'low'
Autoencoder для мультисенсорных данных
LSTM Autoencoder — реконструкционная ошибка как аномалия:
import torch
import torch.nn as nn
class SensorAutoencoder(nn.Module):
def __init__(self, input_dim, hidden_dim=32, latent_dim=8, seq_len=60):
super().__init__()
self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.bottleneck = nn.Linear(hidden_dim, latent_dim)
self.decoder = nn.LSTM(latent_dim, hidden_dim, batch_first=True)
self.output_layer = nn.Linear(hidden_dim, input_dim)
def forward(self, x):
# x: (batch, seq_len, input_dim)
enc_out, _ = self.encoder(x)
z = self.bottleneck(enc_out[:, -1, :]) # последний hidden state
# Разворачиваем latent для декодера
z_expanded = z.unsqueeze(1).repeat(1, x.shape[1], 1)
dec_out, _ = self.decoder(z_expanded)
reconstruction = self.output_layer(dec_out)
return reconstruction
def detect_anomaly_autoencoder(model, sensor_window, threshold_percentile=95):
"""
threshold устанавливается из нормальных данных валидационного набора
"""
with torch.no_grad():
reconstruction = model(sensor_window)
error = torch.mean((sensor_window - reconstruction)**2, dim=[1, 2])
return error.item()
Алертинг и подавление шума
Многоуровневый консенсус:
def consensus_anomaly_decision(l2_statistical, l3_isolation_forest,
l4_supervised, asset_criticality):
"""
Разные детекторы ловят разные типы аномалий.
Консенсус снижает false positive rate.
"""
votes = sum([
1 if l2_statistical['anomaly'] else 0,
1 if l3_isolation_forest['anomaly'] else 0,
1 if l4_supervised.get('anomaly', False) else 0
])
severity = max(
l2_statistical.get('severity', 'none'),
l3_isolation_forest.get('severity', 'none'),
key=lambda s: {'none': 0, 'low': 1, 'medium': 2, 'high': 3, 'critical': 4}[s]
)
# Критичное оборудование: достаточно 1 детектора
# Стандартное: нужно 2 детектора
threshold = 1 if asset_criticality >= 4 else 2
return {
'alert': votes >= threshold,
'confidence': votes / 3,
'severity': severity,
'detectors_triggered': votes
}
Дедупликация коррелированных алертов: Один сбой насоса → аномалия давления + температуры + расхода. Группировка по времени (±5 мин) и топологии (смежные активы) формирует один инцидент с набором сигналов.
Дрейф модели и переобучение
Детекция concept drift:
from scipy.stats import ks_2samp
def detect_model_drift(recent_features, baseline_features, p_threshold=0.01):
"""
KS-тест: если распределение признаков сдвинулось — модель устарела
Триггер для переобучения на новых данных
"""
drift_features = []
for col in recent_features.columns:
stat, p_value = ks_2samp(baseline_features[col], recent_features[col])
if p_value < p_threshold:
drift_features.append(col)
drift_ratio = len(drift_features) / len(recent_features.columns)
return {
'drift_detected': drift_ratio > 0.3,
'drifted_features': drift_features,
'drift_ratio': drift_ratio
}
Автоматический pipeline: еженедельная проверка дрейфа → при drift_ratio > 0.3 запускается переобучение на данных последних 60 дней (если они размечены как нормальные по оперативному журналу).
Сроки: Isolation Forest + базовые признаки + EWMA алертинг + дашборд — 3-4 недели. Autoencoder с LSTM, мультисенсорный консенсус, drift detection, автопереобучение — 2-3 месяца.







