Настройка Kafka Streams для обработки потоков данных

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.

Разработка и обслуживание любых видов сайтов:

Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка Kafka Streams для обработки потоков данных
Сложная
~5 рабочих дней
Часто задаваемые вопросы

Наши компетенции:

Этапы разработки

Последние работы

  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    874
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    851

Настройка Kafka Streams для обработки потоков данных

Kafka Streams — библиотека для потоковой обработки данных, которая работает как часть обычного Java/Kotlin приложения. Никакого отдельного кластера обработки, никакого YARN или Mesos — только зависимость в pom.xml и обычный JVM-процесс. Это принципиальное отличие от Flink и Spark Streaming, где нужна отдельная инфраструктура.

Архитектурная картина

Kafka Streams читает топики, трансформирует данные, агрегирует, джойнит и пишет результат обратно в Kafka или во внешние системы через Kafka Connect. Состояние хранится локально в RocksDB и реплицируется в changelog-топики Kafka — это даёт отказоустойчивость без внешней базы.

Типичные задачи на сайте: агрегация событий пользователей в реальном времени (DAU, воронки), обогащение потока заказов данными из справочников, fraud detection по паттернам поведения, деривация материализованных представлений из event-sourced данных.

Базовая топология

StreamsBuilder builder = new StreamsBuilder();

KStream<String, UserEvent> events = builder.stream(
    "user-events",
    Consumed.with(Serdes.String(), userEventSerde)
);

// Фильтрация + трансформация
KStream<String, PageView> pageViews = events
    .filter((userId, event) -> event.getType().equals("PAGE_VIEW"))
    .mapValues(event -> PageView.from(event));

// Ветвление потока
Map<String, KStream<String, UserEvent>> branches = events.split(Named.as("branch-"))
    .branch((k, v) -> v.getType().equals("PURCHASE"), Branched.as("purchases"))
    .branch((k, v) -> v.getType().equals("CLICK"), Branched.as("clicks"))
    .defaultBranch(Branched.as("other"));

branches.get("branch-purchases").to("purchase-events");

Агрегации с оконными функциями

Задача — считать количество просмотров страниц по пользователям в скользящем 5-минутном окне:

KTable<Windowed<String>, Long> pageViewCounts = pageViews
    .groupByKey(Grouped.with(Serdes.String(), pageViewSerde))
    .windowedBy(
        SlidingWindows.ofTimeDifferenceAndGrace(
            Duration.ofMinutes(5),
            Duration.ofSeconds(30)  // grace period для поздних событий
        )
    )
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("page-view-counts")
        .withValueSerde(Serdes.Long())
    );

// Publish результатов
pageViewCounts.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),
        new PageViewStat(windowedKey.key(), windowedKey.window().start(), count)
    ))
    .to("page-view-stats", Produced.with(Serdes.String(), pageViewStatSerde));

Типы окон:

  • TumblingWindows — фиксированные непересекающиеся интервалы (00:00–00:05, 00:05–00:10)
  • HoppingWindows — фиксированный размер, скользящий шаг
  • SlidingWindows — двигаются по каждому событию
  • SessionWindows — группировка по промежуткам активности

KTable и материализованные представления

KTable — это changelog-stream, в котором каждый новый record с тем же ключом перезаписывает предыдущий. Используется для справочных данных:

// Обогащение потока событий данными о пользователях
KTable<String, UserProfile> userProfiles = builder.table(
    "user-profiles",
    Materialized.as("user-profiles-store")
);

KStream<String, EnrichedEvent> enriched = events.join(
    userProfiles,
    (event, profile) -> EnrichedEvent.builder()
        .event(event)
        .userName(profile.getName())
        .userSegment(profile.getSegment())
        .build()
);

KTable join работает синхронно — запись из стрима джойнится с текущим состоянием таблицы на момент обработки.

Сериализация с Avro и Schema Registry

В production всегда нужна схема. Avro + Confluent Schema Registry — стандарт:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>7.5.0</version>
</dependency>
Map<String, Object> serdeConfig = Map.of(
    AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081",
    KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true
);

KafkaAvroSerde<UserEvent> userEventSerde = new KafkaAvroSerde<>();
userEventSerde.configure(serdeConfig, false);  // false = value serde, не key

KStream<String, UserEvent> events = builder.stream(
    "user-events",
    Consumed.with(Serdes.String(), userEventSerde)
);

Schema Registry хранит версии схем и обеспечивает совместимость при эволюции. Изменение схемы без регистрации ломает consumer'ов.

Конфигурация приложения

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "site-analytics-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// Производительность
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

// Обработка ошибок
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);

// RocksDB state store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Interactive Queries — чтение состояния

Kafka Streams позволяет читать state store без round-trip через Kafka:

ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "page-view-counts",
        QueryableStoreTypes.keyValueStore()
    )
);

// GET /api/stats/user/:userId
Long count = store.get(userId);

// Для windowed store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "page-view-counts-windowed",
        QueryableStoreTypes.windowStore()
    )
);
WindowStoreIterator<Long> iterator = windowStore.fetch(
    userId,
    Instant.now().minus(Duration.ofMinutes(5)),
    Instant.now()
);

При горизонтальном масштабировании state распределён между инстансами. Kafka Streams предоставляет streamsMetadataForKey для определения, на каком хосте живёт нужный ключ — это основа для реализации REST-прокси к распределённому состоянию.

Мониторинг и метрики

Kafka Streams экспортирует метрики через JMX. Для Prometheus — JMX Exporter:

# docker-compose.yml
services:
  streams-app:
    image: my-streams-app:latest
    environment:
      JAVA_OPTS: >
        -javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent.jar=8080:/opt/jmx-exporter/config.yml
        -Xmx2g
        -XX:+UseG1GC

Ключевые метрики:

  • kafka.streams:type=stream-metrics,client-id=* — общие метрики приложения
  • process-rate — записей в секунду
  • process-latency-avg — средняя задержка обработки
  • commit-latency-avg — задержка коммита
  • rocksdb-block-cache-hit-ratio — эффективность кеша RocksDB

Dead Letter Queue

Ошибки десериализации и обработки нужно маршрутизировать, а не просто логировать:

// Кастомный обработчик ошибок
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
    private final Producer<byte[], byte[]> dlqProducer;

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        dlqProducer.send(new ProducerRecord<>("dead-letter-queue", record.key(), record.value()));
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

Сроки

Настройка Kafka + Schema Registry + базовая топология Streams — 3–4 дня. Агрегации с windowed operations и Interactive Queries для REST API — ещё 3–5 дней. Полноценный пайплайн с мониторингом, DLQ, тестами (TopologyTestDriver) и CI/CD — 2–3 недели. Переезд существующей аналитики с batch-обработки на Streams — оценивается отдельно после аудита топиков и схем.