AI-система автоматического обнаружения аномалий в данных
Аномалии в данных — это не только подозрительные значения, но и проблемы качества данных: технические сбои, ошибки ETL, дрейф схемы, изменение поведения источников. Система мониторинга качества данных (Data Quality Monitoring) обнаруживает их автоматически, не дожидаясь жалоб аналитиков.
Типология аномалий в данных
Классификация по природе:
data_anomaly_types = {
# Статистические аномалии значений
'point_anomaly': 'одно значение резко выбивается из ряда',
'contextual_anomaly': 'значение нормально в другом контексте (лето-зима)',
'collective_anomaly': 'группа нормальных значений образует ненормальный паттерн',
# Аномалии качества данных
'schema_drift': 'новый столбец появился, старый исчез, тип изменился',
'distribution_drift': 'распределение признака сдвинулось (feature drift)',
'cardinality_anomaly': 'резкий рост уникальных значений в категориальном поле',
'null_spike': 'процент NULL вырос с 0% до 40% за сутки',
'volume_anomaly': 'количество записей за период аномально мало или велико',
'freshness_anomaly': 'данные не обновлялись дольше ожидаемого'
}
Data Quality Monitoring
Автоматические проверки качества:
import pandas as pd
import numpy as np
from scipy.stats import ks_2samp
class DataQualityMonitor:
def __init__(self, table_name: str, baseline_stats: dict):
self.table_name = table_name
self.baseline = baseline_stats
def run_quality_checks(self, current_df: pd.DataFrame) -> dict:
results = {'table': self.table_name, 'checks': [], 'issues': []}
# 1. Проверка объёма
row_count = len(current_df)
baseline_rows = self.baseline.get('row_count_mean', row_count)
baseline_rows_std = self.baseline.get('row_count_std', row_count * 0.1)
volume_z = (row_count - baseline_rows) / (baseline_rows_std + 1e-9)
if abs(volume_z) > 3:
results['issues'].append({
'check': 'volume',
'severity': 'critical' if abs(volume_z) > 5 else 'warning',
'current': row_count,
'expected': int(baseline_rows),
'z_score': round(volume_z, 2)
})
# 2. NULL ratio по колонкам
for col in current_df.columns:
null_pct = current_df[col].isnull().mean() * 100
baseline_null = self.baseline.get(f'{col}_null_pct', 0)
if null_pct > baseline_null + 10: # >10% роста
results['issues'].append({
'check': 'null_spike',
'column': col,
'severity': 'major' if null_pct > 50 else 'warning',
'current_null_pct': round(null_pct, 1),
'baseline_null_pct': round(baseline_null, 1)
})
# 3. Дрейф распределения (KS-тест)
for col in current_df.select_dtypes(include=[np.number]).columns:
if f'{col}_sample' in self.baseline:
stat, p_value = ks_2samp(
self.baseline[f'{col}_sample'],
current_df[col].dropna().values
)
if p_value < 0.001:
results['issues'].append({
'check': 'distribution_drift',
'column': col,
'severity': 'warning',
'ks_statistic': round(stat, 3),
'p_value': round(p_value, 5)
})
results['passed'] = len(results['issues']) == 0
return results
Автоматическое профилирование и baseline
Построение baseline из исторических данных:
def build_data_baseline(historical_batches: list[pd.DataFrame]) -> dict:
"""
Baseline = статистика за последние 30 дней (обновляется еженедельно).
"""
row_counts = [len(df) for df in historical_batches]
baseline = {
'row_count_mean': np.mean(row_counts),
'row_count_std': np.std(row_counts),
'row_count_min': np.min(row_counts),
'row_count_max': np.max(row_counts)
}
if historical_batches:
sample_df = pd.concat(historical_batches[-7:]) # последняя неделя
for col in sample_df.select_dtypes(include=[np.number]).columns:
col_data = sample_df[col].dropna()
baseline[f'{col}_mean'] = col_data.mean()
baseline[f'{col}_std'] = col_data.std()
baseline[f'{col}_p5'] = col_data.quantile(0.05)
baseline[f'{col}_p95'] = col_data.quantile(0.95)
baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
# Храним 500 сэмплов для KS-теста
baseline[f'{col}_sample'] = col_data.sample(min(500, len(col_data))).values
for col in sample_df.select_dtypes(include=['object', 'category']).columns:
baseline[f'{col}_cardinality'] = sample_df[col].nunique()
baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
baseline[f'{col}_top_values'] = sample_df[col].value_counts().head(20).to_dict()
return baseline
Детекция аномалий в многомерных данных
Isolation Forest для табличных данных:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, LabelEncoder
def detect_row_level_anomalies(df: pd.DataFrame,
contamination: float = 0.02) -> pd.DataFrame:
"""
Обнаружение аномальных записей (не только отдельных значений).
Полезно для: транзакционные данные, логи, CRM записи.
"""
# Препроцессинг
df_processed = df.copy()
for col in df_processed.select_dtypes(include=['object']).columns:
le = LabelEncoder()
df_processed[col] = le.fit_transform(df_processed[col].astype(str))
df_numeric = df_processed.select_dtypes(include=[np.number]).fillna(-999)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_numeric)
model = IsolationForest(contamination=contamination, random_state=42)
anomaly_labels = model.fit_predict(X_scaled)
anomaly_scores = -model.score_samples(X_scaled)
df['is_anomaly'] = anomaly_labels == -1
df['anomaly_score'] = anomaly_scores
# Объяснение: какие признаки наиболее аномальны
df_anomalies = df[df['is_anomaly']].copy()
return df_anomalies.sort_values('anomaly_score', ascending=False)
Schema Drift Detection
Мониторинг изменений схемы данных:
def detect_schema_drift(current_schema: dict, baseline_schema: dict) -> dict:
"""
Сравниваем схему текущих данных со схемой из baseline.
Критично для ETL пайплайнов: изменение upstream источника ломает downstream.
"""
issues = []
# Пропавшие столбцы
missing_cols = set(baseline_schema.keys()) - set(current_schema.keys())
for col in missing_cols:
issues.append({
'type': 'column_dropped',
'column': col,
'severity': 'critical',
'action': 'check_upstream_source'
})
# Новые столбцы
new_cols = set(current_schema.keys()) - set(baseline_schema.keys())
for col in new_cols:
issues.append({
'type': 'column_added',
'column': col,
'severity': 'info',
'action': 'review_and_update_documentation'
})
# Изменение типов
for col in set(baseline_schema.keys()) & set(current_schema.keys()):
if baseline_schema[col] != current_schema[col]:
issues.append({
'type': 'type_changed',
'column': col,
'from': baseline_schema[col],
'to': current_schema[col],
'severity': 'major',
'action': 'validate_downstream_compatibility'
})
return {
'schema_drift_detected': len(issues) > 0,
'critical_issues': [i for i in issues if i['severity'] == 'critical'],
'all_issues': issues
}
Интеграция с Data Platform: Great Expectations для декларативных тестов, dbt tests для трансформаций, Apache Atlas / Datahub для data lineage. Алерты в Slack, PagerDuty, email при severity >= 'major'. Дашборд в Grafana с history quality score по каждой таблице.
Сроки: Quality checks (volume, nulls, schema) + базовый дашборд — 2-3 недели. Distribution drift (KS-тест), row-level Isolation Forest, Great Expectations интеграция, lineage tracking — 2-3 месяца.







