Обнаружение аномалий трафика в реальном времени
Аномалия трафика — отклонение от исторически нормального паттерна: резкий всплеск запросов, неожиданный рост ошибок, нетипичное распределение по endpoint'ам. Автоматическое обнаружение позволяет реагировать на DDoS, попытки скрейпинга и инфраструктурные сбои за секунды, а не часы.
Что считать аномалией
Объёмные аномалии: RPS, полоса пропускания, количество уникальных IP резко возрастают.
Структурные аномалии: соотношение HTTP-методов меняется (резкий рост GET при норме 70/30 GET/POST), рост доли запросов к конкретным endpoint'ам.
Качественные аномалии: доля ошибок 4xx/5xx растёт, p99 latency пробивает историческую норму, увеличивается доля 404 (сканирование).
Статистические методы обнаружения
import numpy as np
from collections import deque
import time
class TrafficAnomalyDetector:
def __init__(self, window_size=60, sensitivity=3.0):
"""
window_size: размер скользящего окна в точках (секунды/минуты)
sensitivity: порог в сигмах (z-score)
"""
self.window_size = window_size
self.sensitivity = sensitivity
self.metrics = {} # {metric_name: deque of values}
def _get_window(self, metric: str) -> deque:
if metric not in self.metrics:
self.metrics[metric] = deque(maxlen=self.window_size)
return self.metrics[metric]
def add_point(self, metric: str, value: float):
"""Добавить новую точку данных"""
self.metrics.setdefault(metric, deque(maxlen=self.window_size)).append(value)
def check(self, metric: str, current_value: float) -> dict:
"""Проверить является ли текущее значение аномалией"""
window = self._get_window(metric)
if len(window) < 10:
# Недостаточно данных для анализа
return {'anomaly': False, 'reason': 'insufficient_data'}
values = list(window)
mean = np.mean(values)
std = np.std(values)
if std == 0:
z_score = 0 if current_value == mean else float('inf')
else:
z_score = abs(current_value - mean) / std
is_anomaly = z_score > self.sensitivity
direction = 'spike' if current_value > mean else 'drop'
return {
'anomaly': is_anomaly,
'z_score': round(z_score, 2),
'direction': direction if is_anomaly else None,
'current': current_value,
'baseline_mean': round(mean, 2),
'baseline_std': round(std, 2),
'threshold': round(mean + self.sensitivity * std, 2)
}
Экспоненциальное сглаживание (EWMA)
Лучше реагирует на тренды, не чувствителен к единичным выбросам:
class EWMADetector:
def __init__(self, alpha=0.1, k=3.0):
"""
alpha: коэффициент сглаживания (0.05–0.2)
k: количество стандартных отклонений для порога
"""
self.alpha = alpha
self.k = k
self.ewma = {} # {metric: {'mean': float, 'variance': float}}
def update_and_check(self, metric: str, value: float) -> dict:
if metric not in self.ewma:
self.ewma[metric] = {'mean': value, 'variance': 0}
return {'anomaly': False}
state = self.ewma[metric]
mean = state['mean']
variance = state['variance']
# Обновить EWMA mean и variance
new_mean = self.alpha * value + (1 - self.alpha) * mean
new_variance = (1 - self.alpha) * (variance + self.alpha * (value - mean) ** 2)
state['mean'] = new_mean
state['variance'] = new_variance
std = np.sqrt(new_variance) if new_variance > 0 else 0
threshold_high = new_mean + self.k * std
threshold_low = max(0, new_mean - self.k * std)
is_anomaly = value > threshold_high or value < threshold_low
return {
'anomaly': is_anomaly,
'direction': 'spike' if value > threshold_high else 'drop',
'current': value,
'expected': round(new_mean, 2),
'threshold_high': round(threshold_high, 2),
'deviation_pct': round(abs(value - new_mean) / max(new_mean, 1) * 100, 1)
}
Сбор метрик в реальном времени
import redis
from datetime import datetime
import threading
class MetricsCollector:
def __init__(self, redis_client):
self.r = redis_client
self.detector = EWMADetector(alpha=0.1, k=3.5)
self.alert_cooldown = {} # предотвратить спам алертов
def record_request(self, status_code: int, path: str,
latency_ms: float, method: str):
"""Вызывается в middleware для каждого запроса"""
now = int(time.time())
minute = now - (now % 60)
pipe = self.r.pipeline()
# RPS счётчик
pipe.incr(f"metrics:rps:{now}")
pipe.expire(f"metrics:rps:{now}", 300)
# Ошибки
if status_code >= 400:
pipe.incr(f"metrics:errors:{now}")
pipe.expire(f"metrics:errors:{now}", 300)
# Latency (гистограмма в Redis)
latency_bucket = int(latency_ms / 100) * 100
pipe.hincrby(f"metrics:latency:{minute}", str(latency_bucket), 1)
pipe.expire(f"metrics:latency:{minute}", 3600)
# Счётчик по endpoint
endpoint = f"{method}:{path.split('?')[0][:50]}"
pipe.hincrby(f"metrics:endpoints:{minute}", endpoint, 1)
pipe.expire(f"metrics:endpoints:{minute}", 3600)
pipe.execute()
def analyze_current_window(self):
"""Анализировать последние 60 секунд и возвращать аномалии"""
now = int(time.time())
anomalies = []
# Собрать RPS за последние 60 секунд
rps_values = []
for i in range(60):
t = now - i
val = self.r.get(f"metrics:rps:{t}")
rps_values.append(int(val or 0))
current_rps = rps_values[0]
# Обновить детектор историческими данными
for v in reversed(rps_values[1:]):
self.detector.update_and_check('rps', v)
result = self.detector.update_and_check('rps', current_rps)
if result['anomaly']:
anomalies.append({
'metric': 'rps',
'severity': 'high' if result['deviation_pct'] > 200 else 'medium',
**result
})
# Error rate
total = sum(rps_values[:60]) or 1
error_keys = [self.r.get(f"metrics:errors:{now-i}") for i in range(60)]
total_errors = sum(int(v or 0) for v in error_keys)
error_rate = total_errors / total
err_result = self.detector.update_and_check('error_rate', error_rate)
if err_result['anomaly'] and error_rate > 0.1:
anomalies.append({
'metric': 'error_rate',
'severity': 'critical' if error_rate > 0.3 else 'high',
**err_result
})
return anomalies
Алертинг и автоматические действия
class AnomalyAlertManager:
def __init__(self, slack_webhook, pagerduty_key):
self.slack = slack_webhook
self.pd = pagerduty_key
self.active_incidents = {}
def handle_anomalies(self, anomalies: list):
for anomaly in anomalies:
key = f"{anomaly['metric']}_{anomaly['direction']}"
# Cooldown: не спамить одним алертом
if self.active_incidents.get(key, 0) > time.time() - 300:
continue
self.active_incidents[key] = time.time()
if anomaly['severity'] == 'critical':
self._page_oncall(anomaly)
self._auto_mitigate(anomaly)
elif anomaly['severity'] == 'high':
self._notify_slack(anomaly)
def _notify_slack(self, anomaly: dict):
import requests
icon = ':rotating_light:' if anomaly['direction'] == 'spike' else ':arrow_down:'
requests.post(self.slack, json={
'text': f"{icon} *Traffic anomaly detected*\n"
f"Metric: `{anomaly['metric']}`\n"
f"Current: `{anomaly['current']}` (expected: `{anomaly['expected']}`)\n"
f"Deviation: `+{anomaly['deviation_pct']}%`\n"
f"Z-score: `{anomaly.get('z_score', 'N/A')}`"
})
def _auto_mitigate(self, anomaly: dict):
"""Автоматические защитные действия при критических аномалиях"""
if anomaly['metric'] == 'rps' and anomaly['direction'] == 'spike':
# Включить защитный rate limit
redis.setex('emergency_rate_limit', 300, '50') # 50 req/s глобально
# Уведомить Cloudflare включить Under Attack Mode через API
self._enable_cloudflare_attack_mode()
def _enable_cloudflare_attack_mode(self):
import requests
requests.patch(
f"https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/settings/security_level",
headers={'Authorization': f'Bearer {CF_API_TOKEN}'},
json={'value': 'under_attack'}
)
Prometheus + Grafana алертинг
# prometheus/alerts.yml
groups:
- name: traffic_anomalies
rules:
- alert: RequestRateSpike
expr: |
rate(http_requests_total[1m]) >
(avg_over_time(rate(http_requests_total[1m])[1h:1m]) * 3)
for: 2m
labels:
severity: critical
annotations:
summary: "Request rate spike: {{ $value }} req/s"
- alert: ErrorRateCritical
expr: |
rate(http_requests_total{status=~"5.."}[5m]) /
rate(http_requests_total[5m]) > 0.1
for: 1m
labels:
severity: critical
annotations:
summary: "Error rate {{ $value | humanizePercentage }}"
- alert: LatencyP99Spike
expr: |
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 3m
labels:
severity: high
annotations:
summary: "P99 latency {{ $value }}s"
Срок выполнения
Реализация системы обнаружения аномалий трафика с EWMA-детектором, Prometheus-алертами и автоматическими защитными действиями — 3–5 рабочих дней.







