Разработка AI-системы дата-инжиниринга
AI-система дата-инжиниринга — это оркестрация всего жизненного цикла данных: автоматическое профилирование источников, генерация ETL-трансформаций, контроль качества, обнаружение аномалий и самовосстановление при сбоях. Вместо того чтобы писать каждый пайплайн вручную, система генерирует его из описания бизнес-требований.
Архитектура системы
[Data Sources] ← API, DB, S3, Kafka, files
↓
[Auto-Discovery & Profiling] ← схема, статистика, качество
↓
[AI Pipeline Generation] ← LLM → DAG код (Airflow/Prefect)
↓
[Transformation Engine] ← dbt, Spark, pandas
↓
[Quality Gate] ← Great Expectations, custom rules
↓
[Data Catalog & Lineage] ← OpenMetadata, DataHub
↓
[ML Feature Store] ← Feast, Hopsworks
↓
[Consumers] ← BI, ML models, APIs
Автогенерация ETL-пайплайнов
from anthropic import Anthropic
import pandas as pd
import yaml
import json
from dataclasses import dataclass
@dataclass
class DataSource:
name: str
type: str # postgres, s3, api, kafka
connection: dict
schema: dict = None
class AIDataEngineeringSystem:
def __init__(self):
self.llm = Anthropic()
self.pipelines = {}
self.quality_rules = {}
def generate_pipeline(self, source: DataSource, target: dict,
business_requirements: str) -> dict:
"""Генерация ETL пайплайна из бизнес-требований"""
# Профилирование источника
if source.schema is None:
source.schema = self._profile_source(source)
# Генерация трансформаций через LLM
pipeline_code = self._generate_transformations(
source, target, business_requirements
)
# Генерация правил качества
quality_rules = self._generate_quality_rules(source.schema, business_requirements)
# Сборка DAG
dag = self._generate_airflow_dag(source, target, pipeline_code, quality_rules)
return {
'pipeline_code': pipeline_code,
'quality_rules': quality_rules,
'dag': dag,
'source_schema': source.schema
}
def _profile_source(self, source: DataSource) -> dict:
"""Автоматическое профилирование источника данных"""
if source.type == 'postgres':
import sqlalchemy
engine = sqlalchemy.create_engine(source.connection['url'])
# Получение схемы
inspector = sqlalchemy.inspect(engine)
schema = {}
for table_name in inspector.get_table_names():
columns = inspector.get_columns(table_name)
schema[table_name] = {
'columns': {col['name']: str(col['type']) for col in columns},
'row_count': pd.read_sql(
f"SELECT COUNT(*) as cnt FROM {table_name}", engine
)['cnt'].iloc[0]
}
return schema
elif source.type == 's3':
import boto3
s3 = boto3.client('s3', **source.connection)
# Профилирование S3 объектов
return self._profile_s3_files(s3, source.connection)
return {}
def _generate_transformations(self, source: DataSource, target: dict,
requirements: str) -> str:
"""LLM генерирует код трансформаций"""
schema_str = json.dumps(source.schema, indent=2)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1500,
system="""You are a senior data engineer. Generate production-quality Python ETL code.
Use pandas/SQLAlchemy. Include error handling, logging, and type hints.
Return only Python code.""",
messages=[{
"role": "user",
"content": f"""Generate ETL transformation code.
Source: {source.type}
Source schema: {schema_str}
Target: {json.dumps(target)}
Business requirements:
{requirements}
Generate Python function def transform(df: pd.DataFrame) -> pd.DataFrame that implements the requirements."""
}]
)
return response.content[0].text
def _generate_quality_rules(self, schema: dict, requirements: str) -> dict:
"""Автогенерация правил качества данных"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
messages=[{
"role": "user",
"content": f"""Generate Great Expectations data quality rules as JSON.
Schema: {json.dumps(schema, indent=2)[:1000]}
Requirements: {requirements}
Return JSON with expectations:
{{
"expectations": [
{{"type": "expect_column_values_to_not_be_null", "column": "id"}},
{{"type": "expect_column_values_to_be_between", "column": "amount", "min_value": 0}},
...
]
}}"""
}]
)
try:
return json.loads(response.content[0].text)
except Exception:
return {"expectations": []}
def _generate_airflow_dag(self, source: DataSource, target: dict,
pipeline_code: str, quality_rules: dict) -> str:
"""Генерация Airflow DAG"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[{
"role": "user",
"content": f"""Generate an Airflow DAG that:
1. Extracts data from {source.type}
2. Applies transformations
3. Validates quality rules
4. Loads to target: {json.dumps(target)}
5. Sends alerts on failure
Include: proper retries, SLA, email alerts.
Use Airflow 2.x TaskFlow API."""
}]
)
return response.content[0].text
Мониторинг и самовосстановление
class PipelineMonitor:
"""AI-мониторинг пайплайнов с автовосстановлением"""
def __init__(self, system: AIDataEngineeringSystem):
self.system = system
self.llm = Anthropic()
self.failure_history = []
def analyze_failure(self, pipeline_name: str, error: str,
context: dict) -> dict:
"""LLM-анализ сбоя и генерация fix"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=600,
messages=[{
"role": "user",
"content": f"""Data pipeline "{pipeline_name}" failed.
Error: {error}
Context:
- Source: {context.get('source_type')}
- Records processed: {context.get('records_processed', 0)}
- Last successful run: {context.get('last_success')}
- Error stack: {context.get('traceback', '')[:500]}
Provide:
1. Root cause (1-2 sentences)
2. Immediate fix (code if applicable)
3. Long-term prevention
4. Severity: critical/warning/info"""
}]
)
analysis = response.content[0].text
# Автоматические действия при известных ошибках
auto_fix = self._attempt_auto_fix(error, context)
return {
'analysis': analysis,
'auto_fix_applied': auto_fix is not None,
'auto_fix': auto_fix,
'pipeline': pipeline_name
}
def _attempt_auto_fix(self, error: str, context: dict) -> str:
"""Автоматические исправления для типовых ошибок"""
error_lower = error.lower()
if 'connection refused' in error_lower or 'timeout' in error_lower:
return "retry_with_backoff"
elif 'schema mismatch' in error_lower or 'column not found' in error_lower:
return "refresh_schema_and_retry"
elif 'disk full' in error_lower or 'out of memory' in error_lower:
return "reduce_batch_size_and_retry"
elif 'duplicate key' in error_lower:
return "switch_to_upsert_mode"
return None
def generate_pipeline_report(self, pipeline_name: str,
metrics: dict) -> str:
"""Еженедельный отчёт по пайплайну"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": f"""Summarize pipeline health for ops report.
Pipeline: {pipeline_name}
Metrics (last 7 days):
{json.dumps(metrics, indent=2)}
Give: status assessment, key issues, trend, recommended actions. 3-5 sentences."""
}]
)
return response.content[0].text
dbt интеграция для трансформаций
class DBTManager:
"""Управление dbt моделями через AI"""
def __init__(self, project_dir: str):
self.project_dir = project_dir
self.llm = Anthropic()
def generate_model(self, model_name: str, requirements: str,
source_tables: list[str]) -> str:
"""Генерация dbt модели из требований"""
# Получение схем источников
sources_info = self._get_sources_info(source_tables)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
messages=[{
"role": "user",
"content": f"""Generate a dbt SQL model.
Model name: {model_name}
Requirements: {requirements}
Available source tables: {json.dumps(sources_info)}
Generate:
1. SQL model using dbt ref() and source() macros
2. Model config block (materialization, tags)
3. Column-level descriptions as SQL comments"""
}]
)
model_sql = response.content[0].text
# Сохранение модели
model_path = f"{self.project_dir}/models/{model_name}.sql"
with open(model_path, 'w') as f:
f.write(model_sql)
# Генерация schema.yml
schema_yml = self._generate_schema_yaml(model_name, model_sql)
schema_path = f"{self.project_dir}/models/{model_name}.yml"
with open(schema_path, 'w') as f:
f.write(schema_yml)
return model_sql
def _generate_schema_yaml(self, model_name: str, model_sql: str) -> str:
"""Автогенерация dbt schema.yml с тестами"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Generate dbt schema.yml for this model with data tests.
Model: {model_name}
SQL: {model_sql[:1000]}
Include: column descriptions, not_null tests, unique tests, accepted_values where relevant.
Return valid YAML."""
}]
)
return response.content[0].text
Производительность системы
| Задача | Ручная работа | С AI-системой | Экономия |
|---|---|---|---|
| Новый источник данных | 3-5 дней | 4-6 часов | 85% |
| ETL трансформация | 1-2 дня | 2-3 часа | 80% |
| Правила качества | 4-8 часов | 30 минут | 87% |
| Документация | 1-2 дня | 1-2 часа | 88% |
| Диагностика сбоев | 2-4 часа | 15-30 минут | 87% |
Полноценная AI-система дата-инжиниринга разворачивается за 4-6 недель. Команда из 3 дата-инженеров с системой справляется с задачами, которые раньше требовали 7-8 человек.







