Реализация AI-автогенерации ETL-пайплайнов
Автогенерация ETL — это когда дата-инженер описывает задачу на русском языке, а система генерирует готовый код пайплайна: Airflow DAG, dbt модели или Python скрипт. Снижает время от постановки задачи до работающего пайплайна с 1-3 дней до 2-4 часов.
Движок генерации пайплайнов
from anthropic import Anthropic
import json
import yaml
from dataclasses import dataclass
@dataclass
class PipelineSpec:
name: str
description: str
source: dict # {type, connection, table/path}
target: dict # {type, connection, table/path}
transformations: list[str]
schedule: str = "@daily"
framework: str = "airflow" # airflow, prefect, dbt, pandas
class ETLAutoGenerator:
def __init__(self):
self.llm = Anthropic()
def generate_from_description(self, description: str,
source_schema: dict = None,
framework: str = "airflow") -> dict:
"""Генерация полного ETL из текстового описания"""
# Шаг 1: Структурирование требований
spec = self._parse_requirements(description, source_schema)
# Шаг 2: Генерация кода
if framework == "airflow":
code = self._generate_airflow_dag(spec)
elif framework == "dbt":
code = self._generate_dbt_model(spec)
elif framework == "prefect":
code = self._generate_prefect_flow(spec)
else:
code = self._generate_pandas_script(spec)
# Шаг 3: Тесты и документация
tests = self._generate_tests(spec, code)
docs = self._generate_documentation(spec)
return {
'spec': spec,
'code': code,
'tests': tests,
'documentation': docs
}
def _parse_requirements(self, description: str,
schema: dict = None) -> PipelineSpec:
"""LLM структурирует текстовые требования"""
schema_str = json.dumps(schema, indent=2) if schema else "Not provided"
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=600,
messages=[{
"role": "user",
"content": f"""Parse this ETL requirement into a structured spec.
Description: {description}
Available schema: {schema_str}
Return JSON:
{{
"name": "pipeline_snake_case_name",
"description": "one sentence description",
"source": {{
"type": "postgres|mysql|s3|api|kafka",
"table_or_path": "table or path name"
}},
"target": {{
"type": "postgres|bigquery|s3|snowflake",
"table_or_path": "output table"
}},
"transformations": [
"list of transformation steps in order"
],
"schedule": "cron expression or @daily/@hourly",
"quality_checks": ["list of data quality validations needed"]
}}"""
}]
)
try:
data = json.loads(response.content[0].text)
return PipelineSpec(
name=data.get('name', 'generated_pipeline'),
description=data.get('description', ''),
source=data.get('source', {}),
target=data.get('target', {}),
transformations=data.get('transformations', []),
schedule=data.get('schedule', '@daily')
)
except Exception:
return PipelineSpec(
name='generated_pipeline',
description=description,
source={},
target={},
transformations=[]
)
def _generate_airflow_dag(self, spec: PipelineSpec) -> str:
"""Генерация Airflow DAG"""
transforms_str = "\n".join(f"- {t}" for t in spec.transformations)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1500,
system="""You are a senior data engineer. Generate production-quality Airflow 2.x DAG code.
Use TaskFlow API (@task decorator). Include: error handling, retries, SLA, proper connections.
Return only Python code.""",
messages=[{
"role": "user",
"content": f"""Generate Airflow DAG for this pipeline:
Name: {spec.name}
Description: {spec.description}
Source: {json.dumps(spec.source)}
Target: {json.dumps(spec.target)}
Schedule: {spec.schedule}
Transformations to implement:
{transforms_str}
Include:
1. Proper imports
2. DAG configuration with retries=2, retry_delay=5min, SLA=1hour
3. Modular @task functions for each transformation step
4. Data quality validation task
5. Email alert on failure"""
}]
)
return response.content[0].text
def _generate_dbt_model(self, spec: PipelineSpec) -> dict:
"""Генерация dbt модели + schema.yml"""
transforms_str = "\n".join(f"- {t}" for t in spec.transformations)
sql_response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
messages=[{
"role": "user",
"content": f"""Generate a dbt SQL model.
Model name: {spec.name}
Description: {spec.description}
Source: {json.dumps(spec.source)}
Transformations:
{transforms_str}
Use dbt {{ config() }}, {{ ref() }}, {{ source() }} macros.
Include comments explaining each transformation."""
}]
)
yaml_response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Generate dbt schema.yml for model "{spec.name}".
Include: description, column descriptions, not_null/unique/accepted_values tests.
Base on: {spec.description}
Return valid YAML."""
}]
)
return {
f"{spec.name}.sql": sql_response.content[0].text,
f"{spec.name}.yml": yaml_response.content[0].text
}
def _generate_prefect_flow(self, spec: PipelineSpec) -> str:
"""Генерация Prefect 2.x Flow"""
transforms_str = "\n".join(f"- {t}" for t in spec.transformations)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
system="Generate Prefect 2.x flow code. Use @task and @flow decorators. Include retries and logging.",
messages=[{
"role": "user",
"content": f"""Generate Prefect flow:
Name: {spec.name}
Source: {json.dumps(spec.source)}
Target: {json.dumps(spec.target)}
Transformations: {transforms_str}
Schedule: {spec.schedule}"""
}]
)
return response.content[0].text
def _generate_pandas_script(self, spec: PipelineSpec) -> str:
"""Простой Python/pandas скрипт для небольших датасетов"""
transforms_str = "\n".join(f"- {t}" for t in spec.transformations)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
system="Generate production Python ETL script. Include logging, error handling, type hints.",
messages=[{
"role": "user",
"content": f"""Generate Python ETL script:
Source: {json.dumps(spec.source)}
Target: {json.dumps(spec.target)}
Transformations: {transforms_str}"""
}]
)
return response.content[0].text
def _generate_tests(self, spec: PipelineSpec, code: str) -> str:
"""Генерация unit тестов для пайплайна"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=600,
messages=[{
"role": "user",
"content": f"""Generate pytest unit tests for this ETL pipeline.
Pipeline description: {spec.description}
Code snippet: {code[:500]}
Include:
1. Tests for each transformation function
2. Edge cases (empty input, null values, duplicates)
3. Data type validation tests"""
}]
)
return response.content[0].text
Итеративное уточнение через диалог
def refine_pipeline(self, generated_code: str,
feedback: str) -> str:
"""Уточнение сгенерированного пайплайна через обратную связь"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[
{
"role": "user",
"content": f"Here's a generated ETL pipeline:\n\n{generated_code}"
},
{
"role": "assistant",
"content": "I've generated this ETL pipeline based on your requirements."
},
{
"role": "user",
"content": f"Please modify it: {feedback}"
}
]
)
return response.content[0].text
Типичный workflow: описание задачи (5 минут) → генерация кода (2-3 минуты) → ревью и итерация (30-60 минут) → тест и деплой. Против традиционного: понимание требований (1 час) → разработка (1-2 дня) → тестирование (полдня). Экономия: 80-85% времени на типовые ETL-задачи.







