Реализация AI-анализа логов и инцидентов (AIOps)
AIOps-анализ логов — это связующее звено между сырыми лог-данными и actionable insights для SRE-команды. Система в реальном времени обрабатывает миллиарды строк, выделяет сигнал из шума и строит временную картину инцидента с причинно-следственными связями.
Масштаб задачи
Типичные объёмы:
- Микросервисная система из 100 сервисов: 5-50 GB логов в час
- Kubernetes-кластер: тысячи pod'ов, события каждые секунды
- Требование latency: аномалия должна быть выявлена < 2 минут после появления
Стек обработки:
Applications/Infra
→ Fluent Bit (lightweight collector, edge filtering)
→ Kafka (буферизация, партиционирование по сервису)
→ Flink / Spark Streaming (обработка)
→ ClickHouse (аналитика) + Elasticsearch (поиск)
→ ML Service (inference)
→ Grafana / Custom UI
Log Structuring Pipeline
Multi-format парсинг:
import re
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ParsedLog:
timestamp: datetime
level: str
service: str
trace_id: str
message: str
parsed_fields: dict
class MultiFormatLogParser:
PATTERNS = {
'nginx': r'(?P<ip>\S+) .* \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<proto>\S+)" (?P<status>\d+) (?P<bytes>\d+)',
'java_log4j': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) (?P<level>\w+) (?P<class>\S+) - (?P<message>.*)',
'python_logging': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>\w+) (?P<logger>\S+): (?P<message>.*)',
'json': None # прямой json.loads
}
def parse(self, raw_line, format_hint=None):
# Попытка JSON сначала
try:
import json
data = json.loads(raw_line)
return self.normalize_json_log(data)
except:
pass
# Попытка regex паттернов
for fmt, pattern in self.PATTERNS.items():
if pattern is None:
continue
match = re.match(pattern, raw_line)
if match:
return self.normalize_regex_log(match.groupdict(), fmt)
# Fallback: неструктурированная строка
return ParsedLog(
timestamp=datetime.now(),
level=self.detect_level(raw_line),
service='unknown',
trace_id=None,
message=raw_line,
parsed_fields={}
)
Intelligent Alerting
Multi-level scoring:
class IntelligentAlertScorer:
def __init__(self):
self.severity_weights = {
'ERROR': 3, 'CRITICAL': 5, 'FATAL': 10,
'WARN': 1, 'WARNING': 1,
'INFO': 0
}
def compute_alert_score(self, log_window, service_profile):
"""
Комплексный скор: серьёзность × аномальность × важность сервиса
"""
# Серьёзность ошибок
error_score = sum(
self.severity_weights.get(log.level, 0)
for log in log_window
)
# Аномальность: отношение к baseline
baseline_errors = service_profile['baseline_error_rate_per_minute']
current_errors = sum(1 for log in log_window if log.level in ['ERROR', 'CRITICAL'])
spike_ratio = current_errors / (baseline_errors * len(log_window) + 1)
# Важность сервиса (критичность для бизнеса)
business_criticality = service_profile.get('criticality', 1)
return error_score * np.log1p(spike_ratio) * business_criticality
Контекстуальное подавление:
def should_suppress_alert(alert, suppression_rules):
"""
Подавление известных ложных тревог:
- Maintenance window
- Known flapping services
- Planned deployment events
"""
now = datetime.now()
for rule in suppression_rules:
if (rule['service'] == alert.service and
rule['start'] <= now <= rule['end'] and
rule['pattern'] in alert.message):
return True, rule['reason']
return False, None
Incident Timeline Construction
Хронологическая реконструкция:
def build_incident_timeline(correlated_logs, metrics, deploys, threshold_events):
"""
Строим полную картину инцидента из всех источников данных
"""
events = []
# Из логов: первые ошибки, их нарастание
for log in correlated_logs:
events.append({
'time': log.timestamp,
'type': 'log_event',
'service': log.service,
'description': log.message[:200],
'severity': log.level
})
# Из метрик: когда началось ухудшение
for metric_anomaly in metrics:
events.append({
'time': metric_anomaly.timestamp,
'type': 'metric_anomaly',
'metric': metric_anomaly.name,
'value': metric_anomaly.value,
'baseline': metric_anomaly.baseline
})
# Деплои и изменения конфигурации
for deploy in deploys:
events.append({
'time': deploy.timestamp,
'type': 'deployment',
'service': deploy.service,
'version': deploy.new_version,
'changed_by': deploy.author
})
return sorted(events, key=lambda x: x['time'])
Automatic Runbook Matching
Семантический поиск по runbook-базе:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
class RunbookMatcher:
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.index = None
self.runbooks = []
def index_runbooks(self, runbooks):
"""
Индексация runbook базы для семантического поиска
"""
self.runbooks = runbooks
texts = [f"{rb['title']} {rb['description']} {' '.join(rb['symptoms'])}"
for rb in runbooks]
embeddings = self.encoder.encode(texts)
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings.astype(np.float32))
def find_relevant_runbooks(self, incident_description, top_k=3):
"""
По описанию инцидента → похожие runbooks
"""
query_embedding = self.encoder.encode([incident_description])
distances, indices = self.index.search(
query_embedding.astype(np.float32), top_k
)
return [
{
'runbook': self.runbooks[idx],
'similarity': 1 / (1 + distance)
}
for idx, distance in zip(indices[0], distances[0])
]
Generative AI для Incident Summary
Автоматическое резюме инцидента:
def generate_incident_brief(incident_timeline, correlated_services, root_cause_candidates, llm):
prompt = f"""
Analyze this incident and provide a concise summary for the on-call engineer:
Timeline of events:
{format_timeline(incident_timeline[:20])} # первые 20 событий
Affected services: {', '.join(correlated_services)}
Probable root cause: {root_cause_candidates[0] if root_cause_candidates else 'Unknown'}
Provide:
1. One-sentence executive summary
2. Key symptoms observed
3. Probable root cause with confidence
4. Recommended immediate actions
5. Estimated impact
"""
return llm.invoke(prompt)
War Room автоматизация:
- Slack: автосоздание инцидент-канала с участниками от затронутых команд
- Confluence: автоматическое создание PIR (Post-Incident Review) документа
- Jira: создание тикетов с тегами причинных факторов
Учётность и обучение на инцидентах
MTTD/MTTR Tracking:
def calculate_incident_metrics(incident_log):
"""
MTTD: Mean Time to Detect
MTTR: Mean Time to Resolve
MTTA: Mean Time to Acknowledge
"""
return {
'mttd_minutes': (incident_log['detected_at'] - incident_log['started_at']).total_seconds() / 60,
'mtta_minutes': (incident_log['acknowledged_at'] - incident_log['detected_at']).total_seconds() / 60,
'mttr_minutes': (incident_log['resolved_at'] - incident_log['started_at']).total_seconds() / 60
}
Сроки: Kafka pipeline + log parsing + alert scoring + Slack интеграция — 4-5 недель. Timeline builder, runbook matcher, LLM incident summary, MTTD/MTTR analytics, Jira/Confluence автоматизация — 3-4 месяца.







