Разработка Workflow-движка на базе Apache Airflow

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Разработка Workflow-движка на базе Apache Airflow
Сложная
~2-4 недели
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Разработка Workflow-движка на базе Apache Airflow

Apache Airflow — платформа для оркестрации data pipelines и ETL-процессов. Workflow определяются как Python-код в виде DAG (Directed Acyclic Graph). Airflow хранит историю запусков, умеет делать backfill, мониторит состояние задач и поддерживает параллельное выполнение.

Когда Airflow, а не Temporal/Camunda

Airflow оптимизирован для batch-обработки данных:

  • ETL/ELT пайплайны (PostgreSQL → трансформация → Data Warehouse)
  • Ежедневные отчёты и выгрузки
  • ML-пайплайны (подготовка данных → обучение → деплой модели)
  • Периодические агрегации и синхронизации

Для событийно-управляемых бизнес-процессов с human tasks — Temporal или Camunda.

Установка через Helm (Kubernetes)

helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  --set executor=KubernetesExecutor \
  --set postgresql.enabled=true \
  --set redis.enabled=true \
  --values airflow-values.yaml
# airflow-values.yaml
airflow:
  image:
    repository: apache/airflow
    tag: 2.8.0
  config:
    AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
    AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: "3"
    AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"

dags:
  gitSync:
    enabled: true
    repo: https://github.com/company/airflow-dags.git
    branch: main
    subPath: dags/

DAG — пример ETL-пайплайна

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['[email protected]'],
}

with DAG(
    'daily_orders_etl',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # каждый день в 02:00 UTC
    catchup=False,
    tags=['etl', 'orders'],
    description='Загрузка и трансформация заказов в DWH',
) as dag:

    # Шаг 1: Извлечь данные из production DB
    def extract_orders(**context):
        hook = PostgresHook(postgres_conn_id='production_db')
        ds = context['ds']  # дата выполнения: 2026-03-28

        df = hook.get_pandas_df(f"""
            SELECT o.id, o.customer_id, o.total, o.status,
                   o.created_at, c.email, c.country
            FROM orders o
            JOIN customers c ON c.id = o.customer_id
            WHERE o.created_at::date = '{ds}'
              AND o.status IN ('paid', 'shipped', 'delivered')
        """)

        # Сохранить в XCom для следующего шага
        context['ti'].xcom_push(key='orders_count', value=len(df))
        df.to_parquet(f'/tmp/orders_{ds}.parquet')
        return len(df)

    # Шаг 2: Трансформация
    def transform_orders(**context):
        ds = context['ds']
        df = pd.read_parquet(f'/tmp/orders_{ds}.parquet')

        # Трансформации
        df['order_date'] = pd.to_datetime(df['created_at']).dt.date
        df['revenue_usd'] = df['total'] / 100  # центы → доллары
        df['is_international'] = df['country'] != 'RU'
        df['customer_tier'] = df['revenue_usd'].apply(
            lambda x: 'vip' if x >= 500 else 'regular'
        )

        df.to_parquet(f'/tmp/orders_transformed_{ds}.parquet')

    # Шаг 3: Загрузить в DWH
    def load_to_dwh(**context):
        ds = context['ds']
        df = pd.read_parquet(f'/tmp/orders_transformed_{ds}.parquet')

        hook = PostgresHook(postgres_conn_id='datawarehouse')
        engine = hook.get_sqlalchemy_engine()

        # Upsert в DWH
        df.to_sql('fact_orders', engine, schema='dwh',
                  if_exists='append', index=False,
                  method='multi', chunksize=1000)

    # Шаг 4: Агрегации для дашборда
    aggregate_metrics = PostgresOperator(
        task_id='aggregate_metrics',
        postgres_conn_id='datawarehouse',
        sql="""
        INSERT INTO dwh.daily_metrics (date, total_revenue, orders_count, avg_order)
        SELECT
          '{{ ds }}'::date,
          SUM(revenue_usd),
          COUNT(*),
          AVG(revenue_usd)
        FROM dwh.fact_orders
        WHERE order_date = '{{ ds }}'
        ON CONFLICT (date) DO UPDATE SET
          total_revenue = EXCLUDED.total_revenue,
          orders_count = EXCLUDED.orders_count,
          avg_order = EXCLUDED.avg_order;
        """,
    )

    extract = PythonOperator(task_id='extract_orders', python_callable=extract_orders)
    transform = PythonOperator(task_id='transform_orders', python_callable=transform_orders)
    load = PythonOperator(task_id='load_to_dwh', python_callable=load_to_dwh)

    # Зависимости
    extract >> transform >> load >> aggregate_metrics

Параллельное выполнение

from airflow.utils.task_group import TaskGroup

with TaskGroup('process_regions') as process_regions:
    for region in ['EU', 'US', 'APAC']:
        PythonOperator(
            task_id=f'process_{region.lower()}',
            python_callable=process_region_data,
            op_kwargs={'region': region}
        )

# Параллельно обрабатываем все регионы, потом агрегируем
extract >> process_regions >> aggregate_all

Sensors — ожидание условий

from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor

# Ждать пока файл появится
wait_for_file = FileSensor(
    task_id='wait_for_export',
    filepath='/data/exports/daily_export_{{ ds }}.csv',
    timeout=3600,
    poke_interval=60,
)

# Ждать пока API вернёт успех
wait_for_api = HttpSensor(
    task_id='wait_for_processing',
    http_conn_id='data_api',
    endpoint='/status/{{ ds }}',
    response_check=lambda response: response.json()['status'] == 'ready',
    timeout=1800,
    poke_interval=120,
)

KubernetesExecutor

При KubernetesExecutor каждая задача запускается в отдельном Pod:

# Конфигурация Pod для конкретной задачи
executor_config = {
    'KubernetesExecutor': {
        'request_memory': '2Gi',
        'request_cpu': '500m',
        'limit_memory': '4Gi',
        'image': 'custom-airflow:2.8.0-pandas',  # кастомный образ с зависимостями
    }
}

heavy_transform = PythonOperator(
    task_id='heavy_transform',
    python_callable=transform_large_dataset,
    executor_config=executor_config
)

Backfill

Пересчитать исторические данные за прошедший период:

airflow dags backfill daily_orders_etl \
  --start-date 2026-01-01 \
  --end-date 2026-03-27 \
  --reset-dagruns

Сроки реализации

  • Airflow deployment (Helm/Docker) + первый DAG — 3–5 дней
  • ETL-пайплайн с 5–8 задачами, трансформациями и DWH-загрузкой — 1–2 недели
  • Сложный пайплайн с параллелизмом, sensors и backfill — 2–4 недели