Настройка Kafka Streams для обработки потоков данных
Kafka Streams — библиотека для потоковой обработки данных, которая работает как часть обычного Java/Kotlin приложения. Никакого отдельного кластера обработки, никакого YARN или Mesos — только зависимость в pom.xml и обычный JVM-процесс. Это принципиальное отличие от Flink и Spark Streaming, где нужна отдельная инфраструктура.
Архитектурная картина
Kafka Streams читает топики, трансформирует данные, агрегирует, джойнит и пишет результат обратно в Kafka или во внешние системы через Kafka Connect. Состояние хранится локально в RocksDB и реплицируется в changelog-топики Kafka — это даёт отказоустойчивость без внешней базы.
Типичные задачи на сайте: агрегация событий пользователей в реальном времени (DAU, воронки), обогащение потока заказов данными из справочников, fraud detection по паттернам поведения, деривация материализованных представлений из event-sourced данных.
Базовая топология
StreamsBuilder builder = new StreamsBuilder();
KStream<String, UserEvent> events = builder.stream(
"user-events",
Consumed.with(Serdes.String(), userEventSerde)
);
// Фильтрация + трансформация
KStream<String, PageView> pageViews = events
.filter((userId, event) -> event.getType().equals("PAGE_VIEW"))
.mapValues(event -> PageView.from(event));
// Ветвление потока
Map<String, KStream<String, UserEvent>> branches = events.split(Named.as("branch-"))
.branch((k, v) -> v.getType().equals("PURCHASE"), Branched.as("purchases"))
.branch((k, v) -> v.getType().equals("CLICK"), Branched.as("clicks"))
.defaultBranch(Branched.as("other"));
branches.get("branch-purchases").to("purchase-events");
Агрегации с оконными функциями
Задача — считать количество просмотров страниц по пользователям в скользящем 5-минутном окне:
KTable<Windowed<String>, Long> pageViewCounts = pageViews
.groupByKey(Grouped.with(Serdes.String(), pageViewSerde))
.windowedBy(
SlidingWindows.ofTimeDifferenceAndGrace(
Duration.ofMinutes(5),
Duration.ofSeconds(30) // grace period для поздних событий
)
)
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("page-view-counts")
.withValueSerde(Serdes.Long())
);
// Publish результатов
pageViewCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
new PageViewStat(windowedKey.key(), windowedKey.window().start(), count)
))
.to("page-view-stats", Produced.with(Serdes.String(), pageViewStatSerde));
Типы окон:
-
TumblingWindows— фиксированные непересекающиеся интервалы (00:00–00:05, 00:05–00:10) -
HoppingWindows— фиксированный размер, скользящий шаг -
SlidingWindows— двигаются по каждому событию -
SessionWindows— группировка по промежуткам активности
KTable и материализованные представления
KTable — это changelog-stream, в котором каждый новый record с тем же ключом перезаписывает предыдущий. Используется для справочных данных:
// Обогащение потока событий данными о пользователях
KTable<String, UserProfile> userProfiles = builder.table(
"user-profiles",
Materialized.as("user-profiles-store")
);
KStream<String, EnrichedEvent> enriched = events.join(
userProfiles,
(event, profile) -> EnrichedEvent.builder()
.event(event)
.userName(profile.getName())
.userSegment(profile.getSegment())
.build()
);
KTable join работает синхронно — запись из стрима джойнится с текущим состоянием таблицы на момент обработки.
Сериализация с Avro и Schema Registry
В production всегда нужна схема. Avro + Confluent Schema Registry — стандарт:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>7.5.0</version>
</dependency>
Map<String, Object> serdeConfig = Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true
);
KafkaAvroSerde<UserEvent> userEventSerde = new KafkaAvroSerde<>();
userEventSerde.configure(serdeConfig, false); // false = value serde, не key
KStream<String, UserEvent> events = builder.stream(
"user-events",
Consumed.with(Serdes.String(), userEventSerde)
);
Schema Registry хранит версии схем и обеспечивает совместимость при эволюции. Изменение схемы без регистрации ломает consumer'ов.
Конфигурация приложения
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "site-analytics-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Производительность
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Обработка ошибок
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// RocksDB state store
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Interactive Queries — чтение состояния
Kafka Streams позволяет читать state store без round-trip через Kafka:
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"page-view-counts",
QueryableStoreTypes.keyValueStore()
)
);
// GET /api/stats/user/:userId
Long count = store.get(userId);
// Для windowed store
ReadOnlyWindowStore<String, Long> windowStore = streams.store(
StoreQueryParameters.fromNameAndType(
"page-view-counts-windowed",
QueryableStoreTypes.windowStore()
)
);
WindowStoreIterator<Long> iterator = windowStore.fetch(
userId,
Instant.now().minus(Duration.ofMinutes(5)),
Instant.now()
);
При горизонтальном масштабировании state распределён между инстансами. Kafka Streams предоставляет streamsMetadataForKey для определения, на каком хосте живёт нужный ключ — это основа для реализации REST-прокси к распределённому состоянию.
Мониторинг и метрики
Kafka Streams экспортирует метрики через JMX. Для Prometheus — JMX Exporter:
# docker-compose.yml
services:
streams-app:
image: my-streams-app:latest
environment:
JAVA_OPTS: >
-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent.jar=8080:/opt/jmx-exporter/config.yml
-Xmx2g
-XX:+UseG1GC
Ключевые метрики:
-
kafka.streams:type=stream-metrics,client-id=*— общие метрики приложения -
process-rate— записей в секунду -
process-latency-avg— средняя задержка обработки -
commit-latency-avg— задержка коммита -
rocksdb-block-cache-hit-ratio— эффективность кеша RocksDB
Dead Letter Queue
Ошибки десериализации и обработки нужно маршрутизировать, а не просто логировать:
// Кастомный обработчик ошибок
public class DlqProductionExceptionHandler implements ProductionExceptionHandler {
private final Producer<byte[], byte[]> dlqProducer;
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
dlqProducer.send(new ProducerRecord<>("dead-letter-queue", record.key(), record.value()));
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
Сроки
Настройка Kafka + Schema Registry + базовая топология Streams — 3–4 дня. Агрегации с windowed operations и Interactive Queries для REST API — ещё 3–5 дней. Полноценный пайплайн с мониторингом, DLQ, тестами (TopologyTestDriver) и CI/CD — 2–3 недели. Переезд существующей аналитики с batch-обработки на Streams — оценивается отдельно после аудита топиков и схем.







