Реализация Anomaly Detection для трафика на сайте

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Реализация Anomaly Detection для трафика на сайте
Сложная
~5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Обнаружение аномалий трафика в реальном времени

Аномалия трафика — отклонение от исторически нормального паттерна: резкий всплеск запросов, неожиданный рост ошибок, нетипичное распределение по endpoint'ам. Автоматическое обнаружение позволяет реагировать на DDoS, попытки скрейпинга и инфраструктурные сбои за секунды, а не часы.

Что считать аномалией

Объёмные аномалии: RPS, полоса пропускания, количество уникальных IP резко возрастают.

Структурные аномалии: соотношение HTTP-методов меняется (резкий рост GET при норме 70/30 GET/POST), рост доли запросов к конкретным endpoint'ам.

Качественные аномалии: доля ошибок 4xx/5xx растёт, p99 latency пробивает историческую норму, увеличивается доля 404 (сканирование).

Статистические методы обнаружения

import numpy as np
from collections import deque
import time

class TrafficAnomalyDetector:
    def __init__(self, window_size=60, sensitivity=3.0):
        """
        window_size: размер скользящего окна в точках (секунды/минуты)
        sensitivity: порог в сигмах (z-score)
        """
        self.window_size = window_size
        self.sensitivity = sensitivity
        self.metrics = {}  # {metric_name: deque of values}

    def _get_window(self, metric: str) -> deque:
        if metric not in self.metrics:
            self.metrics[metric] = deque(maxlen=self.window_size)
        return self.metrics[metric]

    def add_point(self, metric: str, value: float):
        """Добавить новую точку данных"""
        self.metrics.setdefault(metric, deque(maxlen=self.window_size)).append(value)

    def check(self, metric: str, current_value: float) -> dict:
        """Проверить является ли текущее значение аномалией"""
        window = self._get_window(metric)

        if len(window) < 10:
            # Недостаточно данных для анализа
            return {'anomaly': False, 'reason': 'insufficient_data'}

        values = list(window)
        mean = np.mean(values)
        std = np.std(values)

        if std == 0:
            z_score = 0 if current_value == mean else float('inf')
        else:
            z_score = abs(current_value - mean) / std

        is_anomaly = z_score > self.sensitivity
        direction = 'spike' if current_value > mean else 'drop'

        return {
            'anomaly': is_anomaly,
            'z_score': round(z_score, 2),
            'direction': direction if is_anomaly else None,
            'current': current_value,
            'baseline_mean': round(mean, 2),
            'baseline_std': round(std, 2),
            'threshold': round(mean + self.sensitivity * std, 2)
        }

Экспоненциальное сглаживание (EWMA)

Лучше реагирует на тренды, не чувствителен к единичным выбросам:

class EWMADetector:
    def __init__(self, alpha=0.1, k=3.0):
        """
        alpha: коэффициент сглаживания (0.05–0.2)
        k: количество стандартных отклонений для порога
        """
        self.alpha = alpha
        self.k = k
        self.ewma = {}   # {metric: {'mean': float, 'variance': float}}

    def update_and_check(self, metric: str, value: float) -> dict:
        if metric not in self.ewma:
            self.ewma[metric] = {'mean': value, 'variance': 0}
            return {'anomaly': False}

        state = self.ewma[metric]
        mean = state['mean']
        variance = state['variance']

        # Обновить EWMA mean и variance
        new_mean = self.alpha * value + (1 - self.alpha) * mean
        new_variance = (1 - self.alpha) * (variance + self.alpha * (value - mean) ** 2)

        state['mean'] = new_mean
        state['variance'] = new_variance

        std = np.sqrt(new_variance) if new_variance > 0 else 0
        threshold_high = new_mean + self.k * std
        threshold_low = max(0, new_mean - self.k * std)

        is_anomaly = value > threshold_high or value < threshold_low

        return {
            'anomaly': is_anomaly,
            'direction': 'spike' if value > threshold_high else 'drop',
            'current': value,
            'expected': round(new_mean, 2),
            'threshold_high': round(threshold_high, 2),
            'deviation_pct': round(abs(value - new_mean) / max(new_mean, 1) * 100, 1)
        }

Сбор метрик в реальном времени

import redis
from datetime import datetime
import threading

class MetricsCollector:
    def __init__(self, redis_client):
        self.r = redis_client
        self.detector = EWMADetector(alpha=0.1, k=3.5)
        self.alert_cooldown = {}  # предотвратить спам алертов

    def record_request(self, status_code: int, path: str,
                       latency_ms: float, method: str):
        """Вызывается в middleware для каждого запроса"""
        now = int(time.time())
        minute = now - (now % 60)

        pipe = self.r.pipeline()

        # RPS счётчик
        pipe.incr(f"metrics:rps:{now}")
        pipe.expire(f"metrics:rps:{now}", 300)

        # Ошибки
        if status_code >= 400:
            pipe.incr(f"metrics:errors:{now}")
            pipe.expire(f"metrics:errors:{now}", 300)

        # Latency (гистограмма в Redis)
        latency_bucket = int(latency_ms / 100) * 100
        pipe.hincrby(f"metrics:latency:{minute}", str(latency_bucket), 1)
        pipe.expire(f"metrics:latency:{minute}", 3600)

        # Счётчик по endpoint
        endpoint = f"{method}:{path.split('?')[0][:50]}"
        pipe.hincrby(f"metrics:endpoints:{minute}", endpoint, 1)
        pipe.expire(f"metrics:endpoints:{minute}", 3600)

        pipe.execute()

    def analyze_current_window(self):
        """Анализировать последние 60 секунд и возвращать аномалии"""
        now = int(time.time())
        anomalies = []

        # Собрать RPS за последние 60 секунд
        rps_values = []
        for i in range(60):
            t = now - i
            val = self.r.get(f"metrics:rps:{t}")
            rps_values.append(int(val or 0))

        current_rps = rps_values[0]

        # Обновить детектор историческими данными
        for v in reversed(rps_values[1:]):
            self.detector.update_and_check('rps', v)

        result = self.detector.update_and_check('rps', current_rps)
        if result['anomaly']:
            anomalies.append({
                'metric': 'rps',
                'severity': 'high' if result['deviation_pct'] > 200 else 'medium',
                **result
            })

        # Error rate
        total = sum(rps_values[:60]) or 1
        error_keys = [self.r.get(f"metrics:errors:{now-i}") for i in range(60)]
        total_errors = sum(int(v or 0) for v in error_keys)
        error_rate = total_errors / total

        err_result = self.detector.update_and_check('error_rate', error_rate)
        if err_result['anomaly'] and error_rate > 0.1:
            anomalies.append({
                'metric': 'error_rate',
                'severity': 'critical' if error_rate > 0.3 else 'high',
                **err_result
            })

        return anomalies

Алертинг и автоматические действия

class AnomalyAlertManager:
    def __init__(self, slack_webhook, pagerduty_key):
        self.slack = slack_webhook
        self.pd = pagerduty_key
        self.active_incidents = {}

    def handle_anomalies(self, anomalies: list):
        for anomaly in anomalies:
            key = f"{anomaly['metric']}_{anomaly['direction']}"

            # Cooldown: не спамить одним алертом
            if self.active_incidents.get(key, 0) > time.time() - 300:
                continue

            self.active_incidents[key] = time.time()

            if anomaly['severity'] == 'critical':
                self._page_oncall(anomaly)
                self._auto_mitigate(anomaly)
            elif anomaly['severity'] == 'high':
                self._notify_slack(anomaly)

    def _notify_slack(self, anomaly: dict):
        import requests
        icon = ':rotating_light:' if anomaly['direction'] == 'spike' else ':arrow_down:'
        requests.post(self.slack, json={
            'text': f"{icon} *Traffic anomaly detected*\n"
                    f"Metric: `{anomaly['metric']}`\n"
                    f"Current: `{anomaly['current']}` (expected: `{anomaly['expected']}`)\n"
                    f"Deviation: `+{anomaly['deviation_pct']}%`\n"
                    f"Z-score: `{anomaly.get('z_score', 'N/A')}`"
        })

    def _auto_mitigate(self, anomaly: dict):
        """Автоматические защитные действия при критических аномалиях"""
        if anomaly['metric'] == 'rps' and anomaly['direction'] == 'spike':
            # Включить защитный rate limit
            redis.setex('emergency_rate_limit', 300, '50')  # 50 req/s глобально
            # Уведомить Cloudflare включить Under Attack Mode через API
            self._enable_cloudflare_attack_mode()

    def _enable_cloudflare_attack_mode(self):
        import requests
        requests.patch(
            f"https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/settings/security_level",
            headers={'Authorization': f'Bearer {CF_API_TOKEN}'},
            json={'value': 'under_attack'}
        )

Prometheus + Grafana алертинг

# prometheus/alerts.yml
groups:
  - name: traffic_anomalies
    rules:
      - alert: RequestRateSpike
        expr: |
          rate(http_requests_total[1m]) >
          (avg_over_time(rate(http_requests_total[1m])[1h:1m]) * 3)
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Request rate spike: {{ $value }} req/s"

      - alert: ErrorRateCritical
        expr: |
          rate(http_requests_total{status=~"5.."}[5m]) /
          rate(http_requests_total[5m]) > 0.1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Error rate {{ $value | humanizePercentage }}"

      - alert: LatencyP99Spike
        expr: |
          histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) > 2
        for: 3m
        labels:
          severity: high
        annotations:
          summary: "P99 latency {{ $value }}s"

Срок выполнения

Реализация системы обнаружения аномалий трафика с EWMA-детектором, Prometheus-алертами и автоматическими защитными действиями — 3–5 рабочих дней.