Реализация потоковой обработки данных с AI Stream Processing и ML

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Реализация потоковой обработки данных с AI Stream Processing и ML
Сложный
~2-4 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Реализация AI-обработки потоков данных для ML-пайплайнов

Потоковая обработка для ML — это инференс моделей на непрерывном потоке событий с задержкой менее 100 мс. Fraud detection, real-time рекомендации, динамическое ценообразование — все они требуют онлайн-вычисления признаков и инференса в одном пайплайне без батчевых задержек.

Архитектура потокового ML-пайплайна

[Kafka / Kinesis / Pulsar]
        ↓
[Feature Computation]     ← Flink / Spark Streaming / Kafka Streams
(агрегации, окна, joins)
        ↓
[Feature Store Online]    ← Redis / DynamoDB (< 5ms lookup)
        ↓
[Model Inference]         ← Triton / TorchServe / ONNX Runtime
(< 20ms)
        ↓
[Decision Engine]         ← бизнес-правила + ML score
        ↓
[Action / Output Kafka]   ← downstream системы

Kafka Streams + онлайн-признаки

from confluent_kafka import Consumer, Producer
import json
import redis
import numpy as np
import time
from collections import deque, defaultdict
import threading

class StreamFeatureComputer:
    """Вычисление признаков в реальном времени"""

    def __init__(self, kafka_config: dict, redis_url: str):
        self.consumer = Consumer(kafka_config)
        self.producer = Producer({'bootstrap.servers': kafka_config['bootstrap.servers']})
        self.redis = redis.from_url(redis_url)
        self.window_store = defaultdict(lambda: deque(maxlen=1000))

    def compute_user_features(self, user_id: str, event: dict) -> dict:
        """Online-признаки для пользователя"""
        key_prefix = f"user:{user_id}"
        now = event['timestamp']

        # Sliding window агрегации через Redis
        pipe = self.redis.pipeline()

        # Транзакционные признаки
        event_key = f"{key_prefix}:events"
        pipe.lpush(event_key, json.dumps({
            'amount': event.get('amount', 0),
            'ts': now,
            'type': event.get('type', 'unknown')
        }))
        pipe.ltrim(event_key, 0, 999)  # Держим последние 1000 событий
        pipe.expire(event_key, 86400)  # TTL 24 часа

        pipe.execute()

        # Агрегации за разные окна
        raw_events = self.redis.lrange(event_key, 0, -1)
        events = [json.loads(e) for e in raw_events]

        # Сортировка по времени
        events.sort(key=lambda x: x['ts'], reverse=True)

        window_1h = [e for e in events if now - e['ts'] <= 3600]
        window_24h = [e for e in events if now - e['ts'] <= 86400]

        amounts_1h = [e['amount'] for e in window_1h]
        amounts_24h = [e['amount'] for e in window_24h]

        features = {
            'user_id': user_id,
            'tx_count_1h': len(window_1h),
            'tx_count_24h': len(window_24h),
            'tx_amount_sum_1h': sum(amounts_1h),
            'tx_amount_sum_24h': sum(amounts_24h),
            'tx_amount_avg_1h': np.mean(amounts_1h) if amounts_1h else 0,
            'tx_amount_max_1h': max(amounts_1h) if amounts_1h else 0,
            'tx_amount_std_1h': np.std(amounts_1h) if len(amounts_1h) > 1 else 0,
            'unique_merchants_1h': len(set(e.get('merchant_id') for e in window_1h)),
            'time_since_last_tx': now - events[0]['ts'] if events else 9999,
        }

        return features

    def compute_velocity_features(self, entity_id: str,
                                   event_type: str,
                                   windows: list[int] = [60, 300, 3600]) -> dict:
        """Velocity checks: частота событий за разные окна"""
        features = {}
        now = int(time.time())

        for window in windows:
            key = f"velocity:{entity_id}:{event_type}:{window}"
            # Increment и expire
            pipe = self.redis.pipeline()
            pipe.incr(key)
            pipe.expire(key, window)
            count, _ = pipe.execute()
            features[f"count_{window}s"] = count

        return features

Потоковый инференс

import onnxruntime as ort
import asyncio
from aiohttp import ClientSession

class StreamMLInference:
    """Низколатентный инференс в потоке"""

    def __init__(self, model_path: str, feature_store: redis.Redis):
        # ONNX для максимальной скорости
        opts = ort.SessionOptions()
        opts.inter_op_num_threads = 2
        opts.intra_op_num_threads = 2
        opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL

        self.session = ort.InferenceSession(
            model_path,
            sess_options=opts,
            providers=['CPUExecutionProvider']
        )
        self.feature_store = feature_store
        self.input_names = [inp.name for inp in self.session.get_inputs()]

    def predict(self, features: dict) -> dict:
        """Инференс < 5ms для tabular модели"""
        # Формирование input tensor
        feature_vector = np.array([[
            features.get(name, 0.0) for name in self.input_names
        ]], dtype=np.float32)

        start = time.perf_counter()
        outputs = self.session.run(None, {self.input_names[0]: feature_vector})
        latency_ms = (time.perf_counter() - start) * 1000

        score = float(outputs[0][0][1])  # Probability of positive class

        return {
            'score': score,
            'decision': 'block' if score > 0.8 else 'review' if score > 0.5 else 'allow',
            'latency_ms': latency_ms
        }

    def batch_predict(self, features_list: list[dict]) -> list[dict]:
        """Батч-инференс для микробатчей"""
        if not features_list:
            return []

        feature_matrix = np.array([
            [f.get(name, 0.0) for name in self.input_names]
            for f in features_list
        ], dtype=np.float32)

        outputs = self.session.run(None, {self.input_names[0]: feature_matrix})
        scores = outputs[0][:, 1].tolist()

        return [
            {'score': s, 'decision': 'block' if s > 0.8 else 'review' if s > 0.5 else 'allow'}
            for s in scores
        ]

Apache Flink пайплайн (Python API)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time

def build_flink_ml_pipeline():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)

    # Kafka source
    source = KafkaSource.builder() \
        .set_bootstrap_servers("kafka:9092") \
        .set_topics("transactions") \
        .set_group_id("ml-pipeline") \
        .set_value_only_deserializer(JsonRowDeserializationSchema()) \
        .build()

    stream = env.from_source(
        source,
        WatermarkStrategy.for_monotonous_timestamps(),
        "Kafka Source"
    )

    # Вычисление агрегатов за 5-минутное скользящее окно
    windowed = stream \
        .key_by(lambda event: event['user_id']) \
        .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(30))) \
        .aggregate(TransactionAggregator())

    # Присоединение к static features из базы
    enriched = windowed.map(EnrichWithStaticFeatures())

    # ML инференс
    scored = enriched.map(MLScoringFunction())

    # Sink: действия в реальном времени
    sink = KafkaSink.builder() \
        .set_bootstrap_servers("kafka:9092") \
        .set_record_serializer(JsonRowSerializationSchema("ml-decisions")) \
        .build()

    scored.sink_to(sink)

    env.execute("ML Streaming Pipeline")

Мониторинг потокового пайплайна

class StreamPipelineMonitor:
    """Метрики для real-time ML пайплайна"""

    def __init__(self, prometheus_port: int = 8000):
        from prometheus_client import Counter, Histogram, Gauge, start_http_server

        self.events_processed = Counter('ml_events_total', 'Total events processed',
                                        ['decision'])
        self.inference_latency = Histogram('ml_inference_latency_ms',
                                           'Inference latency in milliseconds',
                                           buckets=[1, 5, 10, 20, 50, 100, 500])
        self.feature_lag = Gauge('feature_store_lag_ms',
                                 'Time between event and feature availability')
        self.model_score_dist = Histogram('ml_model_score',
                                          'Distribution of model scores',
                                          buckets=[0.1*i for i in range(11)])

        start_http_server(prometheus_port)

    def record_inference(self, result: dict):
        self.events_processed.labels(decision=result['decision']).inc()
        self.inference_latency.observe(result.get('latency_ms', 0))
        self.model_score_dist.observe(result['score'])

Производительность по сценариям

Сценарий Throughput P99 Latency Инфраструктура
Fraud detection 50K events/sec 45ms 4 CPU pods
Real-time recsys 10K events/sec 80ms 8 CPU pods
Dynamic pricing 2K events/sec 150ms 2 CPU + Redis
Content moderation 500 items/sec 200ms 2 GPU pods

Инициализация типового потокового ML-пайплайна (Kafka + Flink + Redis Feature Store + ONNX) занимает 3-4 недели разработки. Latency бюджет: 20ms на вычисление признаков + 5ms инференс + 10ms доставка решения = 35ms end-to-end.