Разработка Workflow-движка на базе Apache Airflow
Apache Airflow — платформа для оркестрации data pipelines и ETL-процессов. Workflow определяются как Python-код в виде DAG (Directed Acyclic Graph). Airflow хранит историю запусков, умеет делать backfill, мониторит состояние задач и поддерживает параллельное выполнение.
Когда Airflow, а не Temporal/Camunda
Airflow оптимизирован для batch-обработки данных:
- ETL/ELT пайплайны (PostgreSQL → трансформация → Data Warehouse)
- Ежедневные отчёты и выгрузки
- ML-пайплайны (подготовка данных → обучение → деплой модели)
- Периодические агрегации и синхронизации
Для событийно-управляемых бизнес-процессов с human tasks — Temporal или Camunda.
Установка через Helm (Kubernetes)
helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set executor=KubernetesExecutor \
--set postgresql.enabled=true \
--set redis.enabled=true \
--values airflow-values.yaml
# airflow-values.yaml
airflow:
image:
repository: apache/airflow
tag: 2.8.0
config:
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: "3"
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"
dags:
gitSync:
enabled: true
repo: https://github.com/company/airflow-dags.git
branch: main
subPath: dags/
DAG — пример ETL-пайплайна
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['[email protected]'],
}
with DAG(
'daily_orders_etl',
default_args=default_args,
schedule_interval='0 2 * * *', # каждый день в 02:00 UTC
catchup=False,
tags=['etl', 'orders'],
description='Загрузка и трансформация заказов в DWH',
) as dag:
# Шаг 1: Извлечь данные из production DB
def extract_orders(**context):
hook = PostgresHook(postgres_conn_id='production_db')
ds = context['ds'] # дата выполнения: 2026-03-28
df = hook.get_pandas_df(f"""
SELECT o.id, o.customer_id, o.total, o.status,
o.created_at, c.email, c.country
FROM orders o
JOIN customers c ON c.id = o.customer_id
WHERE o.created_at::date = '{ds}'
AND o.status IN ('paid', 'shipped', 'delivered')
""")
# Сохранить в XCom для следующего шага
context['ti'].xcom_push(key='orders_count', value=len(df))
df.to_parquet(f'/tmp/orders_{ds}.parquet')
return len(df)
# Шаг 2: Трансформация
def transform_orders(**context):
ds = context['ds']
df = pd.read_parquet(f'/tmp/orders_{ds}.parquet')
# Трансформации
df['order_date'] = pd.to_datetime(df['created_at']).dt.date
df['revenue_usd'] = df['total'] / 100 # центы → доллары
df['is_international'] = df['country'] != 'RU'
df['customer_tier'] = df['revenue_usd'].apply(
lambda x: 'vip' if x >= 500 else 'regular'
)
df.to_parquet(f'/tmp/orders_transformed_{ds}.parquet')
# Шаг 3: Загрузить в DWH
def load_to_dwh(**context):
ds = context['ds']
df = pd.read_parquet(f'/tmp/orders_transformed_{ds}.parquet')
hook = PostgresHook(postgres_conn_id='datawarehouse')
engine = hook.get_sqlalchemy_engine()
# Upsert в DWH
df.to_sql('fact_orders', engine, schema='dwh',
if_exists='append', index=False,
method='multi', chunksize=1000)
# Шаг 4: Агрегации для дашборда
aggregate_metrics = PostgresOperator(
task_id='aggregate_metrics',
postgres_conn_id='datawarehouse',
sql="""
INSERT INTO dwh.daily_metrics (date, total_revenue, orders_count, avg_order)
SELECT
'{{ ds }}'::date,
SUM(revenue_usd),
COUNT(*),
AVG(revenue_usd)
FROM dwh.fact_orders
WHERE order_date = '{{ ds }}'
ON CONFLICT (date) DO UPDATE SET
total_revenue = EXCLUDED.total_revenue,
orders_count = EXCLUDED.orders_count,
avg_order = EXCLUDED.avg_order;
""",
)
extract = PythonOperator(task_id='extract_orders', python_callable=extract_orders)
transform = PythonOperator(task_id='transform_orders', python_callable=transform_orders)
load = PythonOperator(task_id='load_to_dwh', python_callable=load_to_dwh)
# Зависимости
extract >> transform >> load >> aggregate_metrics
Параллельное выполнение
from airflow.utils.task_group import TaskGroup
with TaskGroup('process_regions') as process_regions:
for region in ['EU', 'US', 'APAC']:
PythonOperator(
task_id=f'process_{region.lower()}',
python_callable=process_region_data,
op_kwargs={'region': region}
)
# Параллельно обрабатываем все регионы, потом агрегируем
extract >> process_regions >> aggregate_all
Sensors — ожидание условий
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
# Ждать пока файл появится
wait_for_file = FileSensor(
task_id='wait_for_export',
filepath='/data/exports/daily_export_{{ ds }}.csv',
timeout=3600,
poke_interval=60,
)
# Ждать пока API вернёт успех
wait_for_api = HttpSensor(
task_id='wait_for_processing',
http_conn_id='data_api',
endpoint='/status/{{ ds }}',
response_check=lambda response: response.json()['status'] == 'ready',
timeout=1800,
poke_interval=120,
)
KubernetesExecutor
При KubernetesExecutor каждая задача запускается в отдельном Pod:
# Конфигурация Pod для конкретной задачи
executor_config = {
'KubernetesExecutor': {
'request_memory': '2Gi',
'request_cpu': '500m',
'limit_memory': '4Gi',
'image': 'custom-airflow:2.8.0-pandas', # кастомный образ с зависимостями
}
}
heavy_transform = PythonOperator(
task_id='heavy_transform',
python_callable=transform_large_dataset,
executor_config=executor_config
)
Backfill
Пересчитать исторические данные за прошедший период:
airflow dags backfill daily_orders_etl \
--start-date 2026-01-01 \
--end-date 2026-03-27 \
--reset-dagruns
Сроки реализации
- Airflow deployment (Helm/Docker) + первый DAG — 3–5 дней
- ETL-пайплайн с 5–8 задачами, трансформациями и DWH-загрузкой — 1–2 недели
- Сложный пайплайн с параллелизмом, sensors и backfill — 2–4 недели







