Реализация AI-анализа логов и инцидентов (AIOps)

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
Реализация AI-анализа логов и инцидентов (AIOps)
Сложная
~1-2 недели
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    853
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

Реализация AI-анализа логов и инцидентов (AIOps)

AIOps-анализ логов — это связующее звено между сырыми лог-данными и actionable insights для SRE-команды. Система в реальном времени обрабатывает миллиарды строк, выделяет сигнал из шума и строит временную картину инцидента с причинно-следственными связями.

Масштаб задачи

Типичные объёмы:

  • Микросервисная система из 100 сервисов: 5-50 GB логов в час
  • Kubernetes-кластер: тысячи pod'ов, события каждые секунды
  • Требование latency: аномалия должна быть выявлена < 2 минут после появления

Стек обработки:

Applications/Infra
    → Fluent Bit (lightweight collector, edge filtering)
    → Kafka (буферизация, партиционирование по сервису)
    → Flink / Spark Streaming (обработка)
    → ClickHouse (аналитика) + Elasticsearch (поиск)
    → ML Service (inference)
    → Grafana / Custom UI

Log Structuring Pipeline

Multi-format парсинг:

import re
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ParsedLog:
    timestamp: datetime
    level: str
    service: str
    trace_id: str
    message: str
    parsed_fields: dict

class MultiFormatLogParser:
    PATTERNS = {
        'nginx': r'(?P<ip>\S+) .* \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<proto>\S+)" (?P<status>\d+) (?P<bytes>\d+)',
        'java_log4j': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) (?P<level>\w+) (?P<class>\S+) - (?P<message>.*)',
        'python_logging': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>\w+) (?P<logger>\S+): (?P<message>.*)',
        'json': None  # прямой json.loads
    }

    def parse(self, raw_line, format_hint=None):
        # Попытка JSON сначала
        try:
            import json
            data = json.loads(raw_line)
            return self.normalize_json_log(data)
        except:
            pass

        # Попытка regex паттернов
        for fmt, pattern in self.PATTERNS.items():
            if pattern is None:
                continue
            match = re.match(pattern, raw_line)
            if match:
                return self.normalize_regex_log(match.groupdict(), fmt)

        # Fallback: неструктурированная строка
        return ParsedLog(
            timestamp=datetime.now(),
            level=self.detect_level(raw_line),
            service='unknown',
            trace_id=None,
            message=raw_line,
            parsed_fields={}
        )

Intelligent Alerting

Multi-level scoring:

class IntelligentAlertScorer:
    def __init__(self):
        self.severity_weights = {
            'ERROR': 3, 'CRITICAL': 5, 'FATAL': 10,
            'WARN': 1, 'WARNING': 1,
            'INFO': 0
        }

    def compute_alert_score(self, log_window, service_profile):
        """
        Комплексный скор: серьёзность × аномальность × важность сервиса
        """
        # Серьёзность ошибок
        error_score = sum(
            self.severity_weights.get(log.level, 0)
            for log in log_window
        )

        # Аномальность: отношение к baseline
        baseline_errors = service_profile['baseline_error_rate_per_minute']
        current_errors = sum(1 for log in log_window if log.level in ['ERROR', 'CRITICAL'])
        spike_ratio = current_errors / (baseline_errors * len(log_window) + 1)

        # Важность сервиса (критичность для бизнеса)
        business_criticality = service_profile.get('criticality', 1)

        return error_score * np.log1p(spike_ratio) * business_criticality

Контекстуальное подавление:

def should_suppress_alert(alert, suppression_rules):
    """
    Подавление известных ложных тревог:
    - Maintenance window
    - Known flapping services
    - Planned deployment events
    """
    now = datetime.now()

    for rule in suppression_rules:
        if (rule['service'] == alert.service and
            rule['start'] <= now <= rule['end'] and
            rule['pattern'] in alert.message):
            return True, rule['reason']

    return False, None

Incident Timeline Construction

Хронологическая реконструкция:

def build_incident_timeline(correlated_logs, metrics, deploys, threshold_events):
    """
    Строим полную картину инцидента из всех источников данных
    """
    events = []

    # Из логов: первые ошибки, их нарастание
    for log in correlated_logs:
        events.append({
            'time': log.timestamp,
            'type': 'log_event',
            'service': log.service,
            'description': log.message[:200],
            'severity': log.level
        })

    # Из метрик: когда началось ухудшение
    for metric_anomaly in metrics:
        events.append({
            'time': metric_anomaly.timestamp,
            'type': 'metric_anomaly',
            'metric': metric_anomaly.name,
            'value': metric_anomaly.value,
            'baseline': metric_anomaly.baseline
        })

    # Деплои и изменения конфигурации
    for deploy in deploys:
        events.append({
            'time': deploy.timestamp,
            'type': 'deployment',
            'service': deploy.service,
            'version': deploy.new_version,
            'changed_by': deploy.author
        })

    return sorted(events, key=lambda x: x['time'])

Automatic Runbook Matching

Семантический поиск по runbook-базе:

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

class RunbookMatcher:
    def __init__(self):
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.index = None
        self.runbooks = []

    def index_runbooks(self, runbooks):
        """
        Индексация runbook базы для семантического поиска
        """
        self.runbooks = runbooks
        texts = [f"{rb['title']} {rb['description']} {' '.join(rb['symptoms'])}"
                 for rb in runbooks]
        embeddings = self.encoder.encode(texts)

        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(dimension)
        self.index.add(embeddings.astype(np.float32))

    def find_relevant_runbooks(self, incident_description, top_k=3):
        """
        По описанию инцидента → похожие runbooks
        """
        query_embedding = self.encoder.encode([incident_description])
        distances, indices = self.index.search(
            query_embedding.astype(np.float32), top_k
        )

        return [
            {
                'runbook': self.runbooks[idx],
                'similarity': 1 / (1 + distance)
            }
            for idx, distance in zip(indices[0], distances[0])
        ]

Generative AI для Incident Summary

Автоматическое резюме инцидента:

def generate_incident_brief(incident_timeline, correlated_services, root_cause_candidates, llm):
    prompt = f"""
    Analyze this incident and provide a concise summary for the on-call engineer:

    Timeline of events:
    {format_timeline(incident_timeline[:20])}  # первые 20 событий

    Affected services: {', '.join(correlated_services)}
    Probable root cause: {root_cause_candidates[0] if root_cause_candidates else 'Unknown'}

    Provide:
    1. One-sentence executive summary
    2. Key symptoms observed
    3. Probable root cause with confidence
    4. Recommended immediate actions
    5. Estimated impact
    """

    return llm.invoke(prompt)

War Room автоматизация:

  • Slack: автосоздание инцидент-канала с участниками от затронутых команд
  • Confluence: автоматическое создание PIR (Post-Incident Review) документа
  • Jira: создание тикетов с тегами причинных факторов

Учётность и обучение на инцидентах

MTTD/MTTR Tracking:

def calculate_incident_metrics(incident_log):
    """
    MTTD: Mean Time to Detect
    MTTR: Mean Time to Resolve
    MTTA: Mean Time to Acknowledge
    """
    return {
        'mttd_minutes': (incident_log['detected_at'] - incident_log['started_at']).total_seconds() / 60,
        'mtta_minutes': (incident_log['acknowledged_at'] - incident_log['detected_at']).total_seconds() / 60,
        'mttr_minutes': (incident_log['resolved_at'] - incident_log['started_at']).total_seconds() / 60
    }

Сроки: Kafka pipeline + log parsing + alert scoring + Slack интеграция — 4-5 недель. Timeline builder, runbook matcher, LLM incident summary, MTTD/MTTR analytics, Jira/Confluence автоматизация — 3-4 месяца.