Разработка AI-системы детекции аномалий в телеком-сети
Телеком-сеть генерирует миллионы метрик в минуту. Традиционные статические пороги — 80% CPU, packet loss > 1% — не улавливают тонкие аномалии: медленный дрейф, коррелированные деградации по нескольким KPI, нетипичный паттерн трафика. ML-детекция работает без заданных порогов, адаптируясь к нормальному поведению каждого элемента.
Многомерная аномалия на уровне сети
Почему статические пороги недостаточны:
- Нормальный CPU маршрутизатора в пиковое время = 75% (не аномалия)
- CPU 50% в 3 ночи в субботу = аномалия (возможна атака или утечка памяти)
- Одновременная деградация 5 KPI на одном элементе = аномалия, хотя каждый отдельно в норме
Контекстно-зависимые пороги:
import pandas as pd
import numpy as np
from prophet import Prophet
class ContextualAnomalyDetector:
def __init__(self, kpi_name: str):
self.kpi_name = kpi_name
self.prophet_model = Prophet(
daily_seasonality=True,
weekly_seasonality=True,
interval_width=0.99
)
self.fitted = False
def fit(self, historical_data: pd.DataFrame):
"""
historical_data: DataFrame с колонками ds (datetime), y (значение KPI)
Минимум 4 недели истории для корректной сезонности.
"""
self.prophet_model.fit(historical_data)
self.fitted = True
def detect(self, current_value: float, current_time: pd.Timestamp) -> dict:
future = pd.DataFrame({'ds': [current_time]})
forecast = self.prophet_model.predict(future)
yhat = forecast['yhat'].values[0]
yhat_lower = forecast['yhat_lower'].values[0]
yhat_upper = forecast['yhat_upper'].values[0]
is_anomaly = current_value < yhat_lower or current_value > yhat_upper
deviation = (current_value - yhat) / (abs(yhat) + 1e-9)
return {
'kpi': self.kpi_name,
'value': current_value,
'expected': yhat,
'bounds': (yhat_lower, yhat_upper),
'anomaly': is_anomaly,
'relative_deviation': deviation
}
Multivariate Anomaly Detection
Isolation Forest на векторе метрик узла:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class NetworkElementAnomalyDetector:
"""
Каждый сетевой элемент имеет свою модель Isolation Forest.
Обучение: 30 дней нормальной работы.
Инференс: каждые 5 минут на текущем векторе KPI.
"""
def __init__(self, element_id: str, contamination=0.01):
self.element_id = element_id
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=100,
random_state=42
)
def fit(self, normal_kpi_matrix: np.ndarray):
"""
normal_kpi_matrix: (N_samples × N_kpis)
"""
X = self.scaler.fit_transform(normal_kpi_matrix)
self.model.fit(X)
# Калибровка порога на нормальных данных
scores = self.model.score_samples(X)
self.threshold = np.percentile(scores, 1) # 1% ложных срабатываний
def score(self, kpi_vector: np.ndarray) -> dict:
X = self.scaler.transform([kpi_vector])
raw_score = self.model.score_samples(X)[0]
anomaly_score = -raw_score # выше = аномальнее
return {
'element_id': self.element_id,
'anomaly_score': float(anomaly_score),
'is_anomaly': raw_score < self.threshold,
'severity': self._score_to_severity(anomaly_score)
}
def _score_to_severity(self, score):
if score > 0.7: return 'critical'
if score > 0.5: return 'major'
if score > 0.3: return 'minor'
return 'normal'
Traffic Pattern Anomaly
Детекция нетипичного трафика (DDoS, BGP hijack):
def detect_traffic_anomaly(traffic_matrix: pd.DataFrame,
baseline_stats: dict) -> list:
"""
traffic_matrix: src_ip × dst_ip × bytes за 5 минут (NetFlow/IPFIX)
Аномалии трафика: объёмные (DDoS), структурные (BGP hijack), протокольные
"""
anomalies = []
# 1. Объёмная аномалия: резкий рост входящего трафика
current_total = traffic_matrix['bytes'].sum()
baseline_total = baseline_stats['total_bytes_mean']
baseline_std = baseline_stats['total_bytes_std']
volume_z_score = (current_total - baseline_total) / (baseline_std + 1e-9)
if volume_z_score > 5:
anomalies.append({
'type': 'volumetric_spike',
'severity': 'critical',
'z_score': volume_z_score,
'possible_cause': 'DDoS attack or flash crowd'
})
# 2. Новые источники: IP, которых не было в baseline
current_sources = set(traffic_matrix['src_ip'].unique())
known_sources = baseline_stats.get('known_src_ips', set())
new_sources = current_sources - known_sources
if len(new_sources) > baseline_stats.get('new_ip_threshold', 1000):
anomalies.append({
'type': 'new_source_flood',
'severity': 'major',
'new_ips_count': len(new_sources)
})
# 3. Протокольная аномалия: рост ICMP или UDP flood
protocol_ratios = traffic_matrix.groupby('protocol')['bytes'].sum() / current_total
for proto in ['ICMP', 'UDP']:
if protocol_ratios.get(proto, 0) > 0.5:
anomalies.append({
'type': f'{proto}_flood',
'severity': 'major',
'ratio': protocol_ratios[proto]
})
return anomalies
BGP и маршрутизация
Детекция BGP anomaly:
def analyze_bgp_events(bgp_updates: pd.DataFrame, baseline_prefix_count: int) -> dict:
"""
BGP hijack: внезапное появление нового AS-path для известного префикса.
BGP leak: маршруты от одного провайдера рекламируются другому.
Route flap: частые обновления = нестабильность соединения.
"""
# Route flapping
prefix_update_counts = bgp_updates.groupby('prefix').size()
flapping_prefixes = prefix_update_counts[prefix_update_counts > 10].index.tolist()
# Новые AS-origin для известных префиксов
known_origins = {} # prefix → expected AS
hijack_candidates = []
for _, row in bgp_updates.iterrows():
if row['prefix'] in known_origins:
if row['origin_as'] != known_origins[row['prefix']]:
hijack_candidates.append({
'prefix': row['prefix'],
'expected_as': known_origins[row['prefix']],
'detected_as': row['origin_as']
})
return {
'flapping_prefixes': flapping_prefixes,
'hijack_candidates': hijack_candidates,
'route_instability': len(flapping_prefixes) > 5
}
Alert Correlation и подавление шума
Граф корреляций аномалий: При сбое аплинка маршрутизатора → сотни downstream аномалий. Алгоритм: строим граф зависимостей из топологии CMDB → определяем upstream источник → группируем в один инцидент.
Сроки: Prophet контекстуальная аномалия + Isolation Forest на узлах + traffic anomaly — 3-4 недели. BGP anomaly, alert correlation граф, автоматический RCA, NOC интеграция — 2-3 месяца.







