Реализация AI-трекинга линейки данных (Data Lineage)
Data Lineage — это граф: откуда данные пришли, через какие трансформации прошли, куда попали. Без него при ошибке в колонке невозможно быстро найти источник проблемы. AI-линейка автоматически строит этот граф из SQL, dbt моделей и кода трансформаций без ручной документации.
Парсинг SQL для линейки
from anthropic import Anthropic
import sqlparse
import sqlglot
import networkx as nx
import json
from dataclasses import dataclass
@dataclass
class LineageNode:
node_id: str
name: str
node_type: str # table, view, query, model, api, file
schema: dict = None
metadata: dict = None
@dataclass
class LineageEdge:
source: str
target: str
transform_type: str # select, join, aggregation, filter, union
columns_mapped: dict = None # {source_col: target_col}
class DataLineageTracker:
def __init__(self):
self.llm = Anthropic()
self.graph = nx.DiGraph()
self.nodes = {}
def parse_sql_lineage(self, sql: str, output_table: str = None) -> dict:
"""Извлечение линейки из SQL запроса"""
try:
# Парсинг через sqlglot
statements = sqlglot.parse(sql)
lineage = {'sources': [], 'targets': [], 'columns': {}}
for stmt in statements:
# Таблицы в FROM и JOIN
for table in stmt.find_all(sqlglot.expressions.Table):
if table.name:
lineage['sources'].append(table.name)
# Целевая таблица (CREATE TABLE AS / INSERT INTO)
if isinstance(stmt, sqlglot.expressions.Create):
lineage['targets'].append(str(stmt.this))
elif isinstance(stmt, sqlglot.expressions.Insert):
lineage['targets'].append(str(stmt.this))
if output_table:
lineage['targets'].append(output_table)
# Маппинг колонок через LLM для сложных случаев
lineage['column_mapping'] = self._extract_column_mapping(sql)
return lineage
except Exception:
# Fallback: LLM-парсинг
return self._llm_parse_lineage(sql, output_table)
def _llm_parse_lineage(self, sql: str, output_table: str = None) -> dict:
"""LLM-извлечение линейки для сложного SQL"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": f"""Extract data lineage from this SQL.
SQL:
{sql[:1500]}
Output table: {output_table or "unknown"}
Return JSON:
{{
"sources": ["table1", "table2"],
"targets": ["output_table"],
"transforms": ["aggregation", "join"],
"column_mapping": {{"source.col1": "target.col_a"}}
}}"""
}]
)
try:
return json.loads(response.content[0].text)
except Exception:
return {'sources': [], 'targets': [], 'transforms': [], 'column_mapping': {}}
def _extract_column_mapping(self, sql: str) -> dict:
"""Маппинг колонок источник → цель"""
try:
parsed = sqlglot.parse_one(sql)
mapping = {}
for col in parsed.find_all(sqlglot.expressions.Column):
alias = col.find_ancestor(sqlglot.expressions.Alias)
if alias:
target_name = str(alias.alias)
source_name = str(col)
mapping[source_name] = target_name
return mapping
except Exception:
return {}
def add_dbt_lineage(self, manifest_path: str):
"""Импорт линейки из dbt manifest.json"""
with open(manifest_path) as f:
manifest = json.load(f)
for node_id, node in manifest.get('nodes', {}).items():
if node.get('resource_type') == 'model':
model_name = node['name']
# Добавление узла модели
self.graph.add_node(model_name, **{
'type': 'dbt_model',
'schema': node.get('database', '') + '.' + node.get('schema', ''),
'description': node.get('description', ''),
'tags': node.get('tags', [])
})
# Зависимости (upstream)
for dep in node.get('depends_on', {}).get('nodes', []):
dep_name = dep.split('.')[-1]
self.graph.add_edge(dep_name, model_name,
transform_type='dbt_ref')
def build_lineage_from_code(self, python_code: str,
file_name: str = "transform.py") -> dict:
"""Извлечение линейки из Python кода трансформации"""
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": f"""Extract data lineage from this Python ETL code.
File: {file_name}
Code:
{python_code[:1500]}
Return JSON:
{{
"reads_from": ["table/file/api names"],
"writes_to": ["output table/file names"],
"transforms": ["description of transformations applied"],
"column_transforms": ["human-readable descriptions of column transformations"]
}}"""
}]
)
try:
return json.loads(response.content[0].text)
except Exception:
return {}
Граф линейки и impact analysis
def get_downstream_impact(self, source_table: str) -> dict:
"""Что сломается если изменить source_table"""
if source_table not in self.graph:
return {'affected': [], 'count': 0}
# BFS для получения всех downstream узлов
affected = []
visited = set()
queue = [source_table]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
successors = list(self.graph.successors(current))
for succ in successors:
if succ != source_table:
affected.append({
'node': succ,
'distance': nx.shortest_path_length(self.graph, source_table, succ),
'type': self.graph.nodes[succ].get('type', 'unknown')
})
queue.extend(successors)
affected.sort(key=lambda x: x['distance'])
return {
'source': source_table,
'affected': affected,
'count': len(affected)
}
def get_upstream_sources(self, target_table: str) -> dict:
"""Откуда данные в target_table"""
if target_table not in self.graph:
return {'sources': [], 'path': []}
# Все предки
ancestors = list(nx.ancestors(self.graph, target_table))
paths = {}
for source in ancestors:
try:
path = nx.shortest_path(self.graph, source, target_table)
paths[source] = path
except nx.NetworkXNoPath:
pass
# AI-объяснение линейки
explanation = self._explain_lineage(target_table, ancestors, paths)
return {
'target': target_table,
'sources': ancestors,
'paths': paths,
'explanation': explanation
}
def _explain_lineage(self, target: str, sources: list, paths: dict) -> str:
"""LLM-объяснение линейки данных"""
paths_summary = json.dumps(
{s: p for s, p in list(paths.items())[:5]},
ensure_ascii=False
)
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Explain the data lineage for table "{target}".
Source tables: {sources}
Key paths: {paths_summary}
Summarize: where data originates, what transformations occur, potential data quality risks.
2-4 sentences, non-technical language."""
}]
)
return response.content[0].text
def detect_lineage_breaks(self) -> list[dict]:
"""Обнаружение разрывов в линейке"""
breaks = []
# Таблицы без источников (кроме raw)
for node in self.graph.nodes():
if self.graph.in_degree(node) == 0:
node_data = self.graph.nodes[node]
if node_data.get('type') not in ['raw_table', 'external_source']:
breaks.append({
'node': node,
'issue': 'no_upstream_lineage',
'severity': 'warning'
})
# Таблицы без потребителей
if self.graph.out_degree(node) == 0:
breaks.append({
'node': node,
'issue': 'orphaned_dataset',
'severity': 'info'
})
return breaks
Трекинг линейки через парсинг SQL + дополнение LLM покрывает 80-90% типовых ETL-паттернов. Для сложных tricky трансформаций (динамический SQL, ORM) нужен runtime tracing. OpenLineage + Marquez — стандарт для автоматического сбора линейки из Airflow, Spark и dbt без написания кода.







