Реализация AI-ETL пайплайна обработки данных
Классический ETL не справляется с неструктурированными данными: PDF с таблицами, HTML с динамическим контентом, изображения с данными, аудио-транскрипты. AI-ETL добавляет слой понимания: извлечение данных из произвольных форматов, нормализация через LLM и интеллектуальная валидация с объяснением ошибок.
Архитектура AI-ETL
from anthropic import Anthropic
import pandas as pd
import json
from dataclasses import dataclass
from typing import Any, Callable
import logging
@dataclass
class ETLStep:
name: str
func: Callable
depends_on: list[str] = None
retry_on_failure: bool = True
max_retries: int = 3
class AIETLPipeline:
def __init__(self, pipeline_name: str):
self.name = pipeline_name
self.llm = Anthropic()
self.steps = []
self.context = {} # Данные между шагами
self.metrics = {}
self.logger = logging.getLogger(pipeline_name)
def add_step(self, step: ETLStep):
self.steps.append(step)
def run(self, initial_data: Any) -> dict:
"""Выполнение пайплайна"""
self.context['input'] = initial_data
errors = []
for step in self.steps:
try:
self.logger.info(f"Running step: {step.name}")
input_data = self.context.get(
step.depends_on[0] if step.depends_on else 'input'
)
result = step.func(input_data, self.context)
self.context[step.name] = result
self.metrics[step.name] = {'status': 'success'}
except Exception as e:
self.logger.error(f"Step {step.name} failed: {e}")
errors.append({'step': step.name, 'error': str(e)})
if step.retry_on_failure:
# AI-assisted recovery
fixed_result = self._ai_recover(step, input_data, str(e))
if fixed_result is not None:
self.context[step.name] = fixed_result
self.metrics[step.name] = {'status': 'recovered'}
continue
self.metrics[step.name] = {'status': 'failed', 'error': str(e)}
break
return {'context': self.context, 'metrics': self.metrics, 'errors': errors}
def _ai_recover(self, step: ETLStep, input_data: Any, error: str) -> Any:
"""Попытка восстановления после ошибки через LLM"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": f"""ETL step "{step.name}" failed.
Error: {error}
Input data type: {type(input_data).__name__}
Input sample: {str(input_data)[:500]}
Suggest recovery: should we skip this step, use default values, or transform input differently?
Respond with JSON: {{"action": "skip|default|transform", "reason": "...", "default_value": ...}}"""
}]
)
try:
decision = json.loads(response.content[0].text)
if decision['action'] == 'skip':
return input_data # Pass through unchanged
elif decision['action'] == 'default':
return decision.get('default_value')
except Exception:
pass
return None
Извлечение данных из неструктурированных источников
class AIExtractor:
"""Извлечение структурированных данных из произвольных форматов"""
def __init__(self):
self.llm = Anthropic()
def extract_from_pdf(self, pdf_path: str, schema: dict) -> list[dict]:
"""PDF → структурированные записи"""
import pdfplumber
all_records = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages):
# Таблицы
for table in page.extract_tables():
if table and len(table) > 1:
df = pd.DataFrame(table[1:], columns=table[0])
records = self._normalize_table_with_ai(df, schema)
all_records.extend(records)
# Текст
text = page.extract_text()
if text and len(text) > 100:
text_records = self._extract_from_text(text, schema)
all_records.extend(text_records)
return all_records
def _extract_from_text(self, text: str, schema: dict) -> list[dict]:
"""LLM-извлечение по схеме из произвольного текста"""
schema_str = json.dumps(schema, ensure_ascii=False, indent=2)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
messages=[{
"role": "user",
"content": f"""Extract structured data from this text according to the schema.
Return JSON array of records. Use null for missing fields.
Schema:
{schema_str}
Text:
{text[:2000]}
Return only JSON array."""
}]
)
try:
text_response = response.content[0].text.strip()
if '```' in text_response:
text_response = text_response.split('```')[1]
if text_response.startswith('json\n'):
text_response = text_response[5:]
return json.loads(text_response)
except Exception:
return []
def _normalize_table_with_ai(self, df: pd.DataFrame, schema: dict) -> list[dict]:
"""Нормализация таблицы с нестандартными заголовками"""
columns_str = ", ".join(df.columns.tolist())
schema_fields = list(schema.keys())
# Маппинг колонок
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=200,
messages=[{
"role": "user",
"content": f"""Map these table columns to schema fields.
Table columns: {columns_str}
Schema fields: {', '.join(schema_fields)}
Return JSON object: {{"table_column": "schema_field"}}. Use null for unmapped."""
}]
)
try:
column_map = json.loads(response.content[0].text)
df_renamed = df.rename(columns={k: v for k, v in column_map.items() if v})
return df_renamed[schema_fields].where(df_renamed.notna(), None).to_dict('records')
except Exception:
return df.to_dict('records')
Трансформации с AI-валидацией
class AITransformer:
"""Умные трансформации с объяснением аномалий"""
def __init__(self):
self.llm = Anthropic()
def clean_and_normalize(self, df: pd.DataFrame,
business_rules: list[str]) -> dict:
"""Очистка + AI-объяснение найденных проблем"""
issues = []
original_count = len(df)
# Стандартные проверки
nulls = df.isnull().sum()
duplicates = df.duplicated().sum()
if nulls.sum() > 0:
issues.append(f"Null values: {nulls[nulls > 0].to_dict()}")
if duplicates > 0:
issues.append(f"Duplicate rows: {duplicates}")
# Проверка бизнес-правил через LLM
if business_rules and len(df) > 0:
sample = df.head(5).to_string()
rules_str = "\n".join(f"- {r}" for r in business_rules)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": f"""Check these data quality rules against the sample data.
Business rules:
{rules_str}
Data sample:
{sample}
List violations found (if any), be specific with row/column references.
If no violations, say "No violations found"."""
}]
)
rule_check = response.content[0].text
if "No violations" not in rule_check:
issues.append(f"Business rule violations: {rule_check}")
# Автоочистка
df_clean = df.drop_duplicates()
df_clean = df_clean.dropna(subset=[col for col in df.columns
if df[col].isnull().mean() < 0.5])
return {
'data': df_clean,
'original_count': original_count,
'cleaned_count': len(df_clean),
'removed': original_count - len(df_clean),
'issues': issues,
'quality_score': 1 - len(issues) * 0.1
}
Мониторинг пайплайна
class ETLMonitor:
"""Метрики и алертинг для AI-ETL"""
def generate_run_report(self, pipeline_result: dict,
expected_records: int = None) -> str:
metrics = pipeline_result['metrics']
errors = pipeline_result['errors']
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Summarize ETL run results for ops team.
Pipeline steps: {json.dumps(metrics)}
Errors: {errors}
Expected records: {expected_records}
Give: status (OK/WARNING/FAILED), key issues, recommended actions. 3-5 sentences."""
}]
)
return response.content[0].text
AI-ETL типично снижает время разработки трансформаций для новых источников данных с 2-3 дней до 4-8 часов. Автовосстановление после ошибок обрабатывает 70-80% типовых сбоев (изменение схемы источника, временные проблемы сети) без участия инженера.







