AI-система отслеживания происхождения данных (Data Lineage AI)

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1 услугВсе 1566 услуг
AI-система отслеживания происхождения данных (Data Lineage AI)
Средняя
~2-4 недели
Часто задаваемые вопросы
Направления AI-разработки
Этапы разработки AI-решения
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1246
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1170
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    873
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1092
  • image_logo-advance_0.png
    Разработка логотипа компании B2B Advance
    563
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    830

Реализация AI-трекинга линейки данных (Data Lineage)

Data Lineage — это граф: откуда данные пришли, через какие трансформации прошли, куда попали. Без него при ошибке в колонке невозможно быстро найти источник проблемы. AI-линейка автоматически строит этот граф из SQL, dbt моделей и кода трансформаций без ручной документации.

Парсинг SQL для линейки

from anthropic import Anthropic
import sqlparse
import sqlglot
import networkx as nx
import json
from dataclasses import dataclass

@dataclass
class LineageNode:
    node_id: str
    name: str
    node_type: str  # table, view, query, model, api, file
    schema: dict = None
    metadata: dict = None

@dataclass
class LineageEdge:
    source: str
    target: str
    transform_type: str  # select, join, aggregation, filter, union
    columns_mapped: dict = None  # {source_col: target_col}

class DataLineageTracker:
    def __init__(self):
        self.llm = Anthropic()
        self.graph = nx.DiGraph()
        self.nodes = {}

    def parse_sql_lineage(self, sql: str, output_table: str = None) -> dict:
        """Извлечение линейки из SQL запроса"""
        try:
            # Парсинг через sqlglot
            statements = sqlglot.parse(sql)
            lineage = {'sources': [], 'targets': [], 'columns': {}}

            for stmt in statements:
                # Таблицы в FROM и JOIN
                for table in stmt.find_all(sqlglot.expressions.Table):
                    if table.name:
                        lineage['sources'].append(table.name)

                # Целевая таблица (CREATE TABLE AS / INSERT INTO)
                if isinstance(stmt, sqlglot.expressions.Create):
                    lineage['targets'].append(str(stmt.this))
                elif isinstance(stmt, sqlglot.expressions.Insert):
                    lineage['targets'].append(str(stmt.this))

            if output_table:
                lineage['targets'].append(output_table)

            # Маппинг колонок через LLM для сложных случаев
            lineage['column_mapping'] = self._extract_column_mapping(sql)

            return lineage

        except Exception:
            # Fallback: LLM-парсинг
            return self._llm_parse_lineage(sql, output_table)

    def _llm_parse_lineage(self, sql: str, output_table: str = None) -> dict:
        """LLM-извлечение линейки для сложного SQL"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": f"""Extract data lineage from this SQL.

SQL:
{sql[:1500]}

Output table: {output_table or "unknown"}

Return JSON:
{{
  "sources": ["table1", "table2"],
  "targets": ["output_table"],
  "transforms": ["aggregation", "join"],
  "column_mapping": {{"source.col1": "target.col_a"}}
}}"""
            }]
        )
        try:
            return json.loads(response.content[0].text)
        except Exception:
            return {'sources': [], 'targets': [], 'transforms': [], 'column_mapping': {}}

    def _extract_column_mapping(self, sql: str) -> dict:
        """Маппинг колонок источник → цель"""
        try:
            parsed = sqlglot.parse_one(sql)
            mapping = {}

            for col in parsed.find_all(sqlglot.expressions.Column):
                alias = col.find_ancestor(sqlglot.expressions.Alias)
                if alias:
                    target_name = str(alias.alias)
                    source_name = str(col)
                    mapping[source_name] = target_name

            return mapping
        except Exception:
            return {}

    def add_dbt_lineage(self, manifest_path: str):
        """Импорт линейки из dbt manifest.json"""
        with open(manifest_path) as f:
            manifest = json.load(f)

        for node_id, node in manifest.get('nodes', {}).items():
            if node.get('resource_type') == 'model':
                model_name = node['name']

                # Добавление узла модели
                self.graph.add_node(model_name, **{
                    'type': 'dbt_model',
                    'schema': node.get('database', '') + '.' + node.get('schema', ''),
                    'description': node.get('description', ''),
                    'tags': node.get('tags', [])
                })

                # Зависимости (upstream)
                for dep in node.get('depends_on', {}).get('nodes', []):
                    dep_name = dep.split('.')[-1]
                    self.graph.add_edge(dep_name, model_name,
                                        transform_type='dbt_ref')

    def build_lineage_from_code(self, python_code: str,
                                  file_name: str = "transform.py") -> dict:
        """Извлечение линейки из Python кода трансформации"""
        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": f"""Extract data lineage from this Python ETL code.

File: {file_name}
Code:
{python_code[:1500]}

Return JSON:
{{
  "reads_from": ["table/file/api names"],
  "writes_to": ["output table/file names"],
  "transforms": ["description of transformations applied"],
  "column_transforms": ["human-readable descriptions of column transformations"]
}}"""
            }]
        )
        try:
            return json.loads(response.content[0].text)
        except Exception:
            return {}

Граф линейки и impact analysis

    def get_downstream_impact(self, source_table: str) -> dict:
        """Что сломается если изменить source_table"""
        if source_table not in self.graph:
            return {'affected': [], 'count': 0}

        # BFS для получения всех downstream узлов
        affected = []
        visited = set()
        queue = [source_table]

        while queue:
            current = queue.pop(0)
            if current in visited:
                continue
            visited.add(current)

            successors = list(self.graph.successors(current))
            for succ in successors:
                if succ != source_table:
                    affected.append({
                        'node': succ,
                        'distance': nx.shortest_path_length(self.graph, source_table, succ),
                        'type': self.graph.nodes[succ].get('type', 'unknown')
                    })
                queue.extend(successors)

        affected.sort(key=lambda x: x['distance'])
        return {
            'source': source_table,
            'affected': affected,
            'count': len(affected)
        }

    def get_upstream_sources(self, target_table: str) -> dict:
        """Откуда данные в target_table"""
        if target_table not in self.graph:
            return {'sources': [], 'path': []}

        # Все предки
        ancestors = list(nx.ancestors(self.graph, target_table))
        paths = {}

        for source in ancestors:
            try:
                path = nx.shortest_path(self.graph, source, target_table)
                paths[source] = path
            except nx.NetworkXNoPath:
                pass

        # AI-объяснение линейки
        explanation = self._explain_lineage(target_table, ancestors, paths)

        return {
            'target': target_table,
            'sources': ancestors,
            'paths': paths,
            'explanation': explanation
        }

    def _explain_lineage(self, target: str, sources: list, paths: dict) -> str:
        """LLM-объяснение линейки данных"""
        paths_summary = json.dumps(
            {s: p for s, p in list(paths.items())[:5]},
            ensure_ascii=False
        )

        response = self.llm.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"""Explain the data lineage for table "{target}".

Source tables: {sources}
Key paths: {paths_summary}

Summarize: where data originates, what transformations occur, potential data quality risks.
2-4 sentences, non-technical language."""
            }]
        )
        return response.content[0].text

    def detect_lineage_breaks(self) -> list[dict]:
        """Обнаружение разрывов в линейке"""
        breaks = []

        # Таблицы без источников (кроме raw)
        for node in self.graph.nodes():
            if self.graph.in_degree(node) == 0:
                node_data = self.graph.nodes[node]
                if node_data.get('type') not in ['raw_table', 'external_source']:
                    breaks.append({
                        'node': node,
                        'issue': 'no_upstream_lineage',
                        'severity': 'warning'
                    })

            # Таблицы без потребителей
            if self.graph.out_degree(node) == 0:
                breaks.append({
                    'node': node,
                    'issue': 'orphaned_dataset',
                    'severity': 'info'
                })

        return breaks

Трекинг линейки через парсинг SQL + дополнение LLM покрывает 80-90% типовых ETL-паттернов. Для сложных tricky трансформаций (динамический SQL, ORM) нужен runtime tracing. OpenLineage + Marquez — стандарт для автоматического сбора линейки из Airflow, Spark и dbt без написания кода.