Реализация AI-анализа бизнес-метрик (BI AI Copilot)
BI AI Copilot превращает корпоративные дашборды из статичных отчётов в интерактивный диалог. Аналитик задаёт вопрос на русском или английском — система запрашивает метрики из хранилища, интерпретирует аномалии и предлагает следующие шаги. Архитектурно это RAG поверх метаданных метрик плюс Text-to-SQL для оперативной аналитики.
Архитектура BI Copilot
[Вопрос пользователя]
↓
[Metrics Catalog RAG] ← определение релевантных метрик
↓
[Text-to-SQL / Metric Query] ← запрос к хранилищу
↓
[Anomaly & Trend Detection] ← числовой анализ результатов
↓
[LLM Narration + Suggestions] ← объяснение + следующие шаги
↓
[Proactive Alerts] ← push при пересечении порогов
Каталог метрик и семантический поиск
from anthropic import Anthropic
import pandas as pd
from dataclasses import dataclass
import numpy as np
@dataclass
class MetricDefinition:
name: str
description: str
sql_template: str
unit: str
owner: str
tags: list[str]
class MetricsCatalog:
"""Семантический реестр бизнес-метрик"""
def __init__(self):
self.metrics = {}
self.llm = Anthropic()
self._load_default_metrics()
def _load_default_metrics(self):
definitions = [
MetricDefinition(
name="monthly_revenue",
description="Выручка за текущий месяц в рублях",
sql_template="""
SELECT SUM(amount) as value
FROM orders
WHERE status = 'completed'
AND DATE_TRUNC('month', created_at) = DATE_TRUNC('month', CURRENT_DATE)
""",
unit="RUB",
owner="finance",
tags=["revenue", "sales", "monthly"]
),
MetricDefinition(
name="dau",
description="Daily Active Users — уникальные пользователи за последние 24 часа",
sql_template="""
SELECT COUNT(DISTINCT user_id) as value
FROM user_events
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
""",
unit="users",
owner="product",
tags=["users", "engagement", "daily"]
),
MetricDefinition(
name="conversion_rate",
description="Конверсия из регистрации в первый заказ за 30 дней",
sql_template="""
SELECT
COUNT(DISTINCT o.user_id)::float / COUNT(DISTINCT u.id) as value
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
AND o.created_at <= u.created_at + INTERVAL '30 days'
WHERE u.created_at >= CURRENT_DATE - INTERVAL '30 days'
""",
unit="%",
owner="growth",
tags=["conversion", "funnel", "growth"]
),
MetricDefinition(
name="churn_rate",
description="Доля пользователей, не вернувшихся за 30 дней",
sql_template="""
SELECT
1 - COUNT(DISTINCT CASE WHEN last_active >= CURRENT_DATE - 30 THEN user_id END)::float
/ NULLIF(COUNT(DISTINCT user_id), 0) as value
FROM user_activity_summary
WHERE cohort_month = DATE_TRUNC('month', CURRENT_DATE - INTERVAL '60 days')
""",
unit="%",
owner="retention",
tags=["churn", "retention", "monthly"]
),
]
for m in definitions:
self.metrics[m.name] = m
def find_relevant_metrics(self, question: str) -> list[MetricDefinition]:
"""Находит релевантные метрики для вопроса через LLM"""
catalog_summary = "\n".join([
f"- {name}: {m.description} (tags: {', '.join(m.tags)})"
for name, m in self.metrics.items()
])
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Given this metrics catalog:
{catalog_summary}
Question: {question}
Return only the metric names that are relevant, comma-separated. No explanation."""
}]
)
metric_names = [m.strip() for m in response.content[0].text.split(',')]
return [self.metrics[n] for n in metric_names if n in self.metrics]
Основной движок Copilot
class BIAICopilot:
def __init__(self, db_connection):
self.db = db_connection
self.llm = Anthropic()
self.catalog = MetricsCatalog()
self.history = []
def ask(self, question: str) -> dict:
"""Основной метод — вопрос на естественном языке"""
# 1. Найти релевантные метрики
relevant_metrics = self.catalog.find_relevant_metrics(question)
# 2. Получить данные
metrics_data = {}
for metric in relevant_metrics:
try:
df = pd.read_sql(metric.sql_template, self.db)
value = df['value'].iloc[0] if not df.empty else None
metrics_data[metric.name] = {
'value': value,
'unit': metric.unit,
'description': metric.description
}
except Exception as e:
metrics_data[metric.name] = {'error': str(e)}
# 3. Получить исторические данные для трендов
trends = self._get_trends(relevant_metrics)
# 4. Сгенерировать аналитический ответ
answer = self._generate_answer(question, metrics_data, trends)
# 5. Сохранить в историю
self.history.append({'question': question, 'answer': answer})
return {
'answer': answer,
'metrics': metrics_data,
'trends': trends,
'suggestions': self._generate_followup_questions(question, metrics_data)
}
def _get_trends(self, metrics: list[MetricDefinition]) -> dict:
"""30-дневный тренд для каждой метрики"""
trends = {}
for metric in metrics:
# Модифицируем SQL для получения дневных значений
trend_sql = f"""
SELECT
DATE_TRUNC('day', CURRENT_DATE - s.day_offset) as date,
({metric.sql_template.strip().rstrip(';')}
-- placeholder for date filter
) as value
FROM generate_series(0, 29) s(day_offset)
ORDER BY date
"""
# В реальности каждый шаблон должен поддерживать параметр даты
# Упрощённо: генерируем mock-данные для демонстрации структуры
trends[metric.name] = {
'direction': 'up', # up/down/flat
'change_pct': 5.2,
'volatility': 'low'
}
return trends
def _generate_answer(self, question: str, metrics_data: dict, trends: dict) -> str:
metrics_str = "\n".join([
f"- {name}: {data.get('value', 'N/A')} {data.get('unit', '')} "
f"(тренд: {trends.get(name, {}).get('direction', 'unknown')})"
for name, data in metrics_data.items()
])
# Включаем историю диалога
conversation = [
{"role": msg["role"], "content": msg["content"]}
for msg in self.history[-4:] # последние 4 сообщения
] if self.history else []
conversation.append({
"role": "user",
"content": f"""Ты BI-аналитик. Отвечай на русском.
Вопрос: {question}
Текущие метрики:
{metrics_str}
Дай конкретный ответ с числами. Укажи на аномалии если есть. 2-4 предложения."""
})
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=conversation
)
return response.content[0].text
def _generate_followup_questions(self, question: str, metrics_data: dict) -> list[str]:
"""3 следующих вопроса для углубления анализа"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=200,
messages=[{
"role": "user",
"content": f"""После вопроса "{question}" предложи 3 следующих вопроса для углубления анализа.
Верни только список вопросов, по одному на строку."""
}]
)
return [q.strip() for q in response.content[0].text.strip().split('\n') if q.strip()]
Проактивные алерты
class MetricsAlertSystem:
"""Мониторинг и push-уведомления при отклонениях"""
def __init__(self, copilot: BIAICopilot):
self.copilot = copilot
self.thresholds = {}
self.llm = Anthropic()
def set_threshold(self, metric_name: str, condition: str, value: float):
"""condition: 'below', 'above', 'change_pct'"""
self.thresholds[metric_name] = {'condition': condition, 'value': value}
def check_and_alert(self) -> list[dict]:
alerts = []
for metric_name, threshold in self.thresholds.items():
current_value = self._get_current_value(metric_name)
if current_value is None:
continue
triggered = False
if threshold['condition'] == 'below' and current_value < threshold['value']:
triggered = True
elif threshold['condition'] == 'above' and current_value > threshold['value']:
triggered = True
if triggered:
# Генерация контекстного объяснения
explanation = self._explain_anomaly(metric_name, current_value, threshold)
alerts.append({
'metric': metric_name,
'value': current_value,
'threshold': threshold,
'explanation': explanation,
'suggested_actions': self._suggest_actions(metric_name, current_value)
})
return alerts
def _explain_anomaly(self, metric: str, value: float, threshold: dict) -> str:
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=150,
messages=[{
"role": "user",
"content": f"Метрика {metric} = {value}, порог {threshold['condition']} {threshold['value']}. "
f"Кратко (2 предложения) — что это может означать и как реагировать?"
}]
)
return response.content[0].text
Интеграция с существующими BI-системами
| Система | Способ подключения | Метрики |
|---|---|---|
| Tableau | REST API + hyper extract | Published datasources |
| Power BI | Datasets API + DAX | Reports, dashboards |
| Metabase | API + card queries | Questions, dashboards |
| Looker | LookML API | Explores, looks |
| Redash | Query API | Saved queries |
| Custom SQL | Direct connection | Any table/view |
Типичный результат внедрения: время от вопроса до ответа сокращается с 2-4 часов (постановка задачи аналитику → SQL → дашборд) до 30-60 секунд. Самообслуживание закрывает 60-75% ad-hoc запросов без участия дата-команды.







