Проектирование архитектуры Data Pipeline для AI
Data Pipeline для AI — это система, которая трансформирует сырые данные из разных источников в готовые к обучению датасеты и признаки для инференса. Правильная архитектура определяет масштабируемость, надёжность и скорость итерации команды ML.
Типы AI Data Pipelines
Batch Pipeline — обработка данных большими порциями по расписанию. Подходит для обучения моделей, подготовки обучающих выборок, ежедневных рекомендаций.
Streaming Pipeline — обработка событий в реальном времени. Подходит для real-time рекомендаций, детекции фрода, динамического ценообразования.
Lambda Architecture — комбинация batch и streaming. Batch слой даёт точность, speed layer — актуальность.
Kappa Architecture — всё через streaming, batch — это просто replay исторических событий. Проще операционно, требует более мощного streaming движка.
Компоненты архитектуры
Data Sources:
├── Operational DBs (PostgreSQL, MySQL) → CDC (Debezium)
├── Event Streams (Kafka) → Direct consumption
├── File Storage (S3) → Batch ingestion
├── APIs (REST, GraphQL) → Connectors (Airbyte, Fivetran)
└── ML Feedback (predictions) → Kafka events
Processing Layer:
├── Batch: Apache Spark / dbt / pandas
├── Streaming: Apache Flink / Spark Structured Streaming
└── Feature computation: Feast / Tecton
Storage Layer:
├── Raw Zone (S3/GCS): исходные данные без изменений
├── Curated Zone (Delta Lake/Iceberg): очищенные данные
├── Feature Store: готовые признаки для ML
└── ML Artifacts (S3 + DVC): датасеты для обучения
Orchestration:
└── Apache Airflow / Prefect / Dagster
Инкрементальная обработка
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)}
)
def user_features_pipeline():
@task
def extract_events(execution_date=None):
# Инкрементальная загрузка: только новые события
watermark = get_watermark('user_events')
events = clickhouse.query(
"SELECT * FROM user_events WHERE event_time > %(watermark)s",
{'watermark': watermark}
)
update_watermark('user_events', events['event_time'].max())
return events.to_parquet()
@task
def compute_features(events_path: str):
events = pd.read_parquet(events_path)
# Вычисление признаков
features = events.groupby('user_id').agg({
'event_time': 'max',
'event_type': 'count',
'session_duration': ['mean', 'sum'],
}).reset_index()
features.columns = [
'user_id', 'last_activity', 'event_count',
'avg_session_duration', 'total_session_time'
]
return features.to_parquet()
@task
def materialize_to_feature_store(features_path: str):
features = pd.read_parquet(features_path)
feast_store.write_to_online_store('user_features', features)
feast_store.write_to_offline_store('user_features', features)
events = extract_events()
features = compute_features(events)
materialize_to_feature_store(features)
pipeline = user_features_pipeline()
Data Quality в Pipeline
from great_expectations.core import ExpectationSuite
class DataQualityValidator:
def __init__(self, suite_name: str):
self.context = great_expectations.get_context()
self.suite = self.context.get_expectation_suite(suite_name)
def validate(self, df: pd.DataFrame) -> ValidationResult:
validator = self.context.get_validator(
batch_request=RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime",
data_asset_name="ml_features",
runtime_parameters={"batch_data": df},
batch_identifiers={"run_id": str(uuid.uuid4())}
),
expectation_suite=self.suite
)
results = validator.validate()
if not results.success:
failed = [r for r in results.results if not r.success]
raise DataQualityError(f"Validation failed: {failed}")
return results
Обработка Schema Evolution
# Delta Lake поддерживает schema evolution:
from delta import DeltaTable
DeltaTable.forPath(spark, "s3://bucket/user_features") \
.toDF() \
.mergeSchema(new_schema) \
.write \
.option("mergeSchema", "true") \
.format("delta") \
.mode("append") \
.save("s3://bucket/user_features")
Мониторинг Pipeline
Ключевые метрики: freshness (задержка данных относительно источника), completeness (% ожидаемых записей получен), latency (время выполнения каждого этапа), error rate. Алерты: данные не обновлялись > N часов, uptime упал ниже порога, количество записей аномально отличается от ожидаемого.
Типичный результат проектирования: ML-команда получает свежие признаки каждые 15-60 минут вместо ежедневных batch расчётов, время подготовки новой обучающей выборки сокращается с нескольких часов до 10-15 минут.







