Разработка AI-системы детекции аномалий в телеком-сети

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
Разработка AI-системы детекции аномалий в телеком-сети
Средняя
~1-2 недели
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    853
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

Разработка AI-системы детекции аномалий в телеком-сети

Телеком-сеть генерирует миллионы метрик в минуту. Традиционные статические пороги — 80% CPU, packet loss > 1% — не улавливают тонкие аномалии: медленный дрейф, коррелированные деградации по нескольким KPI, нетипичный паттерн трафика. ML-детекция работает без заданных порогов, адаптируясь к нормальному поведению каждого элемента.

Многомерная аномалия на уровне сети

Почему статические пороги недостаточны:

  • Нормальный CPU маршрутизатора в пиковое время = 75% (не аномалия)
  • CPU 50% в 3 ночи в субботу = аномалия (возможна атака или утечка памяти)
  • Одновременная деградация 5 KPI на одном элементе = аномалия, хотя каждый отдельно в норме

Контекстно-зависимые пороги:

import pandas as pd
import numpy as np
from prophet import Prophet

class ContextualAnomalyDetector:
    def __init__(self, kpi_name: str):
        self.kpi_name = kpi_name
        self.prophet_model = Prophet(
            daily_seasonality=True,
            weekly_seasonality=True,
            interval_width=0.99
        )
        self.fitted = False

    def fit(self, historical_data: pd.DataFrame):
        """
        historical_data: DataFrame с колонками ds (datetime), y (значение KPI)
        Минимум 4 недели истории для корректной сезонности.
        """
        self.prophet_model.fit(historical_data)
        self.fitted = True

    def detect(self, current_value: float, current_time: pd.Timestamp) -> dict:
        future = pd.DataFrame({'ds': [current_time]})
        forecast = self.prophet_model.predict(future)

        yhat = forecast['yhat'].values[0]
        yhat_lower = forecast['yhat_lower'].values[0]
        yhat_upper = forecast['yhat_upper'].values[0]

        is_anomaly = current_value < yhat_lower or current_value > yhat_upper
        deviation = (current_value - yhat) / (abs(yhat) + 1e-9)

        return {
            'kpi': self.kpi_name,
            'value': current_value,
            'expected': yhat,
            'bounds': (yhat_lower, yhat_upper),
            'anomaly': is_anomaly,
            'relative_deviation': deviation
        }

Multivariate Anomaly Detection

Isolation Forest на векторе метрик узла:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class NetworkElementAnomalyDetector:
    """
    Каждый сетевой элемент имеет свою модель Isolation Forest.
    Обучение: 30 дней нормальной работы.
    Инференс: каждые 5 минут на текущем векторе KPI.
    """
    def __init__(self, element_id: str, contamination=0.01):
        self.element_id = element_id
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=100,
            random_state=42
        )

    def fit(self, normal_kpi_matrix: np.ndarray):
        """
        normal_kpi_matrix: (N_samples × N_kpis)
        """
        X = self.scaler.fit_transform(normal_kpi_matrix)
        self.model.fit(X)
        # Калибровка порога на нормальных данных
        scores = self.model.score_samples(X)
        self.threshold = np.percentile(scores, 1)  # 1% ложных срабатываний

    def score(self, kpi_vector: np.ndarray) -> dict:
        X = self.scaler.transform([kpi_vector])
        raw_score = self.model.score_samples(X)[0]
        anomaly_score = -raw_score  # выше = аномальнее

        return {
            'element_id': self.element_id,
            'anomaly_score': float(anomaly_score),
            'is_anomaly': raw_score < self.threshold,
            'severity': self._score_to_severity(anomaly_score)
        }

    def _score_to_severity(self, score):
        if score > 0.7: return 'critical'
        if score > 0.5: return 'major'
        if score > 0.3: return 'minor'
        return 'normal'

Traffic Pattern Anomaly

Детекция нетипичного трафика (DDoS, BGP hijack):

def detect_traffic_anomaly(traffic_matrix: pd.DataFrame,
                            baseline_stats: dict) -> list:
    """
    traffic_matrix: src_ip × dst_ip × bytes за 5 минут (NetFlow/IPFIX)
    Аномалии трафика: объёмные (DDoS), структурные (BGP hijack), протокольные
    """
    anomalies = []

    # 1. Объёмная аномалия: резкий рост входящего трафика
    current_total = traffic_matrix['bytes'].sum()
    baseline_total = baseline_stats['total_bytes_mean']
    baseline_std = baseline_stats['total_bytes_std']

    volume_z_score = (current_total - baseline_total) / (baseline_std + 1e-9)
    if volume_z_score > 5:
        anomalies.append({
            'type': 'volumetric_spike',
            'severity': 'critical',
            'z_score': volume_z_score,
            'possible_cause': 'DDoS attack or flash crowd'
        })

    # 2. Новые источники: IP, которых не было в baseline
    current_sources = set(traffic_matrix['src_ip'].unique())
    known_sources = baseline_stats.get('known_src_ips', set())
    new_sources = current_sources - known_sources
    if len(new_sources) > baseline_stats.get('new_ip_threshold', 1000):
        anomalies.append({
            'type': 'new_source_flood',
            'severity': 'major',
            'new_ips_count': len(new_sources)
        })

    # 3. Протокольная аномалия: рост ICMP или UDP flood
    protocol_ratios = traffic_matrix.groupby('protocol')['bytes'].sum() / current_total
    for proto in ['ICMP', 'UDP']:
        if protocol_ratios.get(proto, 0) > 0.5:
            anomalies.append({
                'type': f'{proto}_flood',
                'severity': 'major',
                'ratio': protocol_ratios[proto]
            })

    return anomalies

BGP и маршрутизация

Детекция BGP anomaly:

def analyze_bgp_events(bgp_updates: pd.DataFrame, baseline_prefix_count: int) -> dict:
    """
    BGP hijack: внезапное появление нового AS-path для известного префикса.
    BGP leak: маршруты от одного провайдера рекламируются другому.
    Route flap: частые обновления = нестабильность соединения.
    """
    # Route flapping
    prefix_update_counts = bgp_updates.groupby('prefix').size()
    flapping_prefixes = prefix_update_counts[prefix_update_counts > 10].index.tolist()

    # Новые AS-origin для известных префиксов
    known_origins = {}  # prefix → expected AS
    hijack_candidates = []
    for _, row in bgp_updates.iterrows():
        if row['prefix'] in known_origins:
            if row['origin_as'] != known_origins[row['prefix']]:
                hijack_candidates.append({
                    'prefix': row['prefix'],
                    'expected_as': known_origins[row['prefix']],
                    'detected_as': row['origin_as']
                })

    return {
        'flapping_prefixes': flapping_prefixes,
        'hijack_candidates': hijack_candidates,
        'route_instability': len(flapping_prefixes) > 5
    }

Alert Correlation и подавление шума

Граф корреляций аномалий: При сбое аплинка маршрутизатора → сотни downstream аномалий. Алгоритм: строим граф зависимостей из топологии CMDB → определяем upstream источник → группируем в один инцидент.

Сроки: Prophet контекстуальная аномалия + Isolation Forest на узлах + traffic anomaly — 3-4 недели. BGP anomaly, alert correlation граф, автоматический RCA, NOC интеграция — 2-3 месяца.