AI-система автоматического обнаружения аномалий в данных

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
AI-система автоматического обнаружения аномалий в данных
Средняя
~2-4 недели
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1218
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    854
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1047
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    561
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    825

AI-система автоматического обнаружения аномалий в данных

Аномалии в данных — это не только подозрительные значения, но и проблемы качества данных: технические сбои, ошибки ETL, дрейф схемы, изменение поведения источников. Система мониторинга качества данных (Data Quality Monitoring) обнаруживает их автоматически, не дожидаясь жалоб аналитиков.

Типология аномалий в данных

Классификация по природе:

data_anomaly_types = {
    # Статистические аномалии значений
    'point_anomaly': 'одно значение резко выбивается из ряда',
    'contextual_anomaly': 'значение нормально в другом контексте (лето-зима)',
    'collective_anomaly': 'группа нормальных значений образует ненормальный паттерн',

    # Аномалии качества данных
    'schema_drift': 'новый столбец появился, старый исчез, тип изменился',
    'distribution_drift': 'распределение признака сдвинулось (feature drift)',
    'cardinality_anomaly': 'резкий рост уникальных значений в категориальном поле',
    'null_spike': 'процент NULL вырос с 0% до 40% за сутки',
    'volume_anomaly': 'количество записей за период аномально мало или велико',
    'freshness_anomaly': 'данные не обновлялись дольше ожидаемого'
}

Data Quality Monitoring

Автоматические проверки качества:

import pandas as pd
import numpy as np
from scipy.stats import ks_2samp

class DataQualityMonitor:
    def __init__(self, table_name: str, baseline_stats: dict):
        self.table_name = table_name
        self.baseline = baseline_stats

    def run_quality_checks(self, current_df: pd.DataFrame) -> dict:
        results = {'table': self.table_name, 'checks': [], 'issues': []}

        # 1. Проверка объёма
        row_count = len(current_df)
        baseline_rows = self.baseline.get('row_count_mean', row_count)
        baseline_rows_std = self.baseline.get('row_count_std', row_count * 0.1)

        volume_z = (row_count - baseline_rows) / (baseline_rows_std + 1e-9)
        if abs(volume_z) > 3:
            results['issues'].append({
                'check': 'volume',
                'severity': 'critical' if abs(volume_z) > 5 else 'warning',
                'current': row_count,
                'expected': int(baseline_rows),
                'z_score': round(volume_z, 2)
            })

        # 2. NULL ratio по колонкам
        for col in current_df.columns:
            null_pct = current_df[col].isnull().mean() * 100
            baseline_null = self.baseline.get(f'{col}_null_pct', 0)

            if null_pct > baseline_null + 10:  # >10% роста
                results['issues'].append({
                    'check': 'null_spike',
                    'column': col,
                    'severity': 'major' if null_pct > 50 else 'warning',
                    'current_null_pct': round(null_pct, 1),
                    'baseline_null_pct': round(baseline_null, 1)
                })

        # 3. Дрейф распределения (KS-тест)
        for col in current_df.select_dtypes(include=[np.number]).columns:
            if f'{col}_sample' in self.baseline:
                stat, p_value = ks_2samp(
                    self.baseline[f'{col}_sample'],
                    current_df[col].dropna().values
                )
                if p_value < 0.001:
                    results['issues'].append({
                        'check': 'distribution_drift',
                        'column': col,
                        'severity': 'warning',
                        'ks_statistic': round(stat, 3),
                        'p_value': round(p_value, 5)
                    })

        results['passed'] = len(results['issues']) == 0
        return results

Автоматическое профилирование и baseline

Построение baseline из исторических данных:

def build_data_baseline(historical_batches: list[pd.DataFrame]) -> dict:
    """
    Baseline = статистика за последние 30 дней (обновляется еженедельно).
    """
    row_counts = [len(df) for df in historical_batches]

    baseline = {
        'row_count_mean': np.mean(row_counts),
        'row_count_std': np.std(row_counts),
        'row_count_min': np.min(row_counts),
        'row_count_max': np.max(row_counts)
    }

    if historical_batches:
        sample_df = pd.concat(historical_batches[-7:])  # последняя неделя

        for col in sample_df.select_dtypes(include=[np.number]).columns:
            col_data = sample_df[col].dropna()
            baseline[f'{col}_mean'] = col_data.mean()
            baseline[f'{col}_std'] = col_data.std()
            baseline[f'{col}_p5'] = col_data.quantile(0.05)
            baseline[f'{col}_p95'] = col_data.quantile(0.95)
            baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
            # Храним 500 сэмплов для KS-теста
            baseline[f'{col}_sample'] = col_data.sample(min(500, len(col_data))).values

        for col in sample_df.select_dtypes(include=['object', 'category']).columns:
            baseline[f'{col}_cardinality'] = sample_df[col].nunique()
            baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
            baseline[f'{col}_top_values'] = sample_df[col].value_counts().head(20).to_dict()

    return baseline

Детекция аномалий в многомерных данных

Isolation Forest для табличных данных:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, LabelEncoder

def detect_row_level_anomalies(df: pd.DataFrame,
                                 contamination: float = 0.02) -> pd.DataFrame:
    """
    Обнаружение аномальных записей (не только отдельных значений).
    Полезно для: транзакционные данные, логи, CRM записи.
    """
    # Препроцессинг
    df_processed = df.copy()

    for col in df_processed.select_dtypes(include=['object']).columns:
        le = LabelEncoder()
        df_processed[col] = le.fit_transform(df_processed[col].astype(str))

    df_numeric = df_processed.select_dtypes(include=[np.number]).fillna(-999)

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(df_numeric)

    model = IsolationForest(contamination=contamination, random_state=42)
    anomaly_labels = model.fit_predict(X_scaled)
    anomaly_scores = -model.score_samples(X_scaled)

    df['is_anomaly'] = anomaly_labels == -1
    df['anomaly_score'] = anomaly_scores

    # Объяснение: какие признаки наиболее аномальны
    df_anomalies = df[df['is_anomaly']].copy()
    return df_anomalies.sort_values('anomaly_score', ascending=False)

Schema Drift Detection

Мониторинг изменений схемы данных:

def detect_schema_drift(current_schema: dict, baseline_schema: dict) -> dict:
    """
    Сравниваем схему текущих данных со схемой из baseline.
    Критично для ETL пайплайнов: изменение upstream источника ломает downstream.
    """
    issues = []

    # Пропавшие столбцы
    missing_cols = set(baseline_schema.keys()) - set(current_schema.keys())
    for col in missing_cols:
        issues.append({
            'type': 'column_dropped',
            'column': col,
            'severity': 'critical',
            'action': 'check_upstream_source'
        })

    # Новые столбцы
    new_cols = set(current_schema.keys()) - set(baseline_schema.keys())
    for col in new_cols:
        issues.append({
            'type': 'column_added',
            'column': col,
            'severity': 'info',
            'action': 'review_and_update_documentation'
        })

    # Изменение типов
    for col in set(baseline_schema.keys()) & set(current_schema.keys()):
        if baseline_schema[col] != current_schema[col]:
            issues.append({
                'type': 'type_changed',
                'column': col,
                'from': baseline_schema[col],
                'to': current_schema[col],
                'severity': 'major',
                'action': 'validate_downstream_compatibility'
            })

    return {
        'schema_drift_detected': len(issues) > 0,
        'critical_issues': [i for i in issues if i['severity'] == 'critical'],
        'all_issues': issues
    }

Интеграция с Data Platform: Great Expectations для декларативных тестов, dbt tests для трансформаций, Apache Atlas / Datahub для data lineage. Алерты в Slack, PagerDuty, email при severity >= 'major'. Дашборд в Grafana с history quality score по каждой таблице.

Сроки: Quality checks (volume, nulls, schema) + базовый дашборд — 2-3 недели. Distribution drift (KS-тест), row-level Isolation Forest, Great Expectations интеграция, lineage tracking — 2-3 месяца.