Разработка продюсеров и консьюмеров Kafka для веб-приложения
Kafka-клиенты в продуктиве — это не просто «отправить сообщение» и «получить сообщение». Это гарантии доставки, идемпотентность, обработка rebalance, управление offset'ами и изоляция от сбоев брокера.
Продюсер: гарантии доставки
Три режима надёжности (acks):
-
acks=0— fire and forget, потеря данных возможна -
acks=1— лидер подтвердил, но реплики могут не успеть -
acks=all(или-1) — все ISR-реплики подтвердили, потеря данных невозможна приmin.insync.replicas=2
Для финансовых транзакций и критичных событий — только acks=all с enable.idempotence=true.
// Java — идемпотентный продюсер
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");
// Надёжность
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max при idempotence
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
// Производительность
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB батч
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // ждём до 5ms для наполнения батча
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67_108_864); // 64MB буфер
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);
Отправка с обработкой ошибок:
public CompletableFuture<RecordMetadata> sendOrderEvent(OrderEvent event) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"order-events",
event.getOrderId(), // ключ — все события одного заказа в одну партицию
event
);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Kafka сам ретраит — этот блок не должен срабатываться при правильном config
log.error("Retriable error, Kafka will retry: {}", exception.getMessage());
} else {
// Non-retriable: Authorization, RecordTooLarge, SerializationException
log.error("Fatal producer error for order {}: {}", event.getOrderId(), exception.getMessage());
future.completeExceptionally(exception);
}
} else {
log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
future.complete(metadata);
}
});
return future;
}
Транзакционный продюсер
Когда нужно атомарно записать в несколько топиков (например, результат обработки + уведомление):
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-instance-1");
// transactional.id должен быть уникальным для каждого экземпляра продюсера
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders-processed", orderId, processedOrder));
producer.send(new ProducerRecord<>("order-notifications", userId, notification));
// Commit offsets консьюмера в рамках той же транзакции (exactly-once)
producer.sendOffsetsToTransaction(currentOffsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
producer.close(); // Этот экземпляр уже невалиден
throw e;
} catch (KafkaException e) {
producer.abortTransaction();
throw e;
}
Консьюмер: правильное управление офсетами
Auto-commit скрывает ошибки: сообщение помечается обработанным ещё до того, как приложение его реально обработало. При сбое — потеря данных.
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
consumerProps.put("specific.avro.reader", true);
// Отключаем auto-commit
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session timeout — если брокер не получает heartbeat за это время, считает консьюмера мёртвым
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);
// Сколько записей получаем за poll
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Максимальное время между poll — если превышено, брокер считает консьюмера мёртвым
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);
Poll-loop с ручным commit:
KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Перед rebalance — сохраняем текущее состояние обработки
commitCurrentOffsets();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// После rebalance — инициализируем состояние для новых партиций
log.info("Assigned partitions: {}", partitions);
}
});
Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();
try {
while (!shutdown.get()) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, OrderEvent> record : records) {
try {
processOrder(record.value());
pendingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (NonRetriableException e) {
// Отправляем в DLQ и коммитим
sendToDlq(record, e);
pendingOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// RetriableException — не коммитим, poll вернёт это сообщение снова
}
if (!pendingOffsets.isEmpty()) {
consumer.commitSync(pendingOffsets);
pendingOffsets.clear();
}
}
} finally {
consumer.close();
}
Параллельная обработка без потери порядка
Один поток poll + пул воркеров по ключу:
// Сохраняем порядок событий одного заказа, параллелим между заказами
Map<Integer, BlockingQueue<ConsumerRecord<String, OrderEvent>>> partitionQueues = new HashMap<>();
ExecutorService workers = Executors.newFixedThreadPool(12);
// При получении записей — роутим в очередь по партиции
for (ConsumerRecord<String, OrderEvent> record : records) {
int partitionIndex = record.partition() % NUM_WORKERS;
workerQueues.get(partitionIndex).offer(record);
}
// Каждый воркер обрабатывает свою очередь последовательно
// → порядок внутри ключа сохраняется
Python-клиент (confluent-kafka-python)
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json
# Продюсер
producer = Producer({
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'batch.size': 65536,
'linger.ms': 5,
'retries': 2147483647,
'delivery.timeout.ms': 120000,
})
def delivery_report(err, msg):
if err is not None:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer.produce(
'user-events',
key=str(user_id),
value=json.dumps(event).encode('utf-8'),
callback=delivery_report
)
producer.flush() # дожидаемся доставки всех pending сообщений
# Консьюмер
consumer = Consumer({
'bootstrap.servers': 'kafka-1:9092',
'group.id': 'web-app-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 30000,
})
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError.PARTITION_EOF:
continue
raise KafkaException(msg.error())
event = json.loads(msg.value().decode('utf-8'))
process_event(event)
consumer.commit(asynchronous=False)
finally:
consumer.close()
Таймлайн
День 1 — проектирование: определяем топики, ключи сообщений (для гарантии порядка), формат сообщений (Avro/JSON/Protobuf), группы консьюмеров.
День 2 — разработка продюсеров: сериализация, настройка acks/idempotence, интеграция с бизнес-логикой приложения.
День 3 — разработка консьюмеров: ручное управление офсетами, обработка rebalance, Dead Letter Queue для ошибочных сообщений.
День 4–5 — тестирование: unit-тесты с EmbeddedKafka или Testcontainers, интеграционные тесты, нагрузочный тест с kafka-producer-perf-test, проверка exactly-once семантики.
День 6 — деплой, настройка мониторинга consumer lag, алерты.







