Разработка продюсеров и консьюмеров Kafka для веб-приложения

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Разработка продюсеров и консьюмеров Kafka для веб-приложения
Сложная
~3-5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Разработка продюсеров и консьюмеров Kafka для веб-приложения

Kafka-клиенты в продуктиве — это не просто «отправить сообщение» и «получить сообщение». Это гарантии доставки, идемпотентность, обработка rebalance, управление offset'ами и изоляция от сбоев брокера.

Продюсер: гарантии доставки

Три режима надёжности (acks):

  • acks=0 — fire and forget, потеря данных возможна
  • acks=1 — лидер подтвердил, но реплики могут не успеть
  • acks=all (или -1) — все ISR-реплики подтвердили, потеря данных невозможна при min.insync.replicas=2

Для финансовых транзакций и критичных событий — только acks=all с enable.idempotence=true.

// Java — идемпотентный продюсер
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://schema-registry:8081");

// Надёжность
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // max при idempotence
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);

// Производительность
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);          // 64KB батч
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);               // ждём до 5ms для наполнения батча
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67_108_864);  // 64MB буфер
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props);

Отправка с обработкой ошибок:

public CompletableFuture<RecordMetadata> sendOrderEvent(OrderEvent event) {
    ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
        "order-events",
        event.getOrderId(),  // ключ — все события одного заказа в одну партицию
        event
    );

    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            if (exception instanceof RetriableException) {
                // Kafka сам ретраит — этот блок не должен срабатываться при правильном config
                log.error("Retriable error, Kafka will retry: {}", exception.getMessage());
            } else {
                // Non-retriable: Authorization, RecordTooLarge, SerializationException
                log.error("Fatal producer error for order {}: {}", event.getOrderId(), exception.getMessage());
                future.completeExceptionally(exception);
            }
        } else {
            log.debug("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
            future.complete(metadata);
        }
    });

    return future;
}

Транзакционный продюсер

Когда нужно атомарно записать в несколько топиков (например, результат обработки + уведомление):

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-instance-1");
// transactional.id должен быть уникальным для каждого экземпляра продюсера

producer.initTransactions();

try {
    producer.beginTransaction();

    producer.send(new ProducerRecord<>("orders-processed", orderId, processedOrder));
    producer.send(new ProducerRecord<>("order-notifications", userId, notification));

    // Commit offsets консьюмера в рамках той же транзакции (exactly-once)
    producer.sendOffsetsToTransaction(currentOffsets, consumerGroupMetadata);

    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    producer.close(); // Этот экземпляр уже невалиден
    throw e;
} catch (KafkaException e) {
    producer.abortTransaction();
    throw e;
}

Консьюмер: правильное управление офсетами

Auto-commit скрывает ошибки: сообщение помечается обработанным ещё до того, как приложение его реально обработало. При сбое — потеря данных.

Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "http://schema-registry:8081");
consumerProps.put("specific.avro.reader", true);

// Отключаем auto-commit
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Session timeout — если брокер не получает heartbeat за это время, считает консьюмера мёртвым
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);

// Сколько записей получаем за poll
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// Максимальное время между poll — если превышено, брокер считает консьюмера мёртвым
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000);

Poll-loop с ручным commit:

KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-events"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Перед rebalance — сохраняем текущее состояние обработки
        commitCurrentOffsets();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // После rebalance — инициализируем состояние для новых партиций
        log.info("Assigned partitions: {}", partitions);
    }
});

Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();

try {
    while (!shutdown.get()) {
        ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, OrderEvent> record : records) {
            try {
                processOrder(record.value());

                pendingOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            } catch (NonRetriableException e) {
                // Отправляем в DLQ и коммитим
                sendToDlq(record, e);
                pendingOffsets.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            }
            // RetriableException — не коммитим, poll вернёт это сообщение снова
        }

        if (!pendingOffsets.isEmpty()) {
            consumer.commitSync(pendingOffsets);
            pendingOffsets.clear();
        }
    }
} finally {
    consumer.close();
}

Параллельная обработка без потери порядка

Один поток poll + пул воркеров по ключу:

// Сохраняем порядок событий одного заказа, параллелим между заказами
Map<Integer, BlockingQueue<ConsumerRecord<String, OrderEvent>>> partitionQueues = new HashMap<>();
ExecutorService workers = Executors.newFixedThreadPool(12);

// При получении записей — роутим в очередь по партиции
for (ConsumerRecord<String, OrderEvent> record : records) {
    int partitionIndex = record.partition() % NUM_WORKERS;
    workerQueues.get(partitionIndex).offer(record);
}

// Каждый воркер обрабатывает свою очередь последовательно
// → порядок внутри ключа сохраняется

Python-клиент (confluent-kafka-python)

from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json

# Продюсер
producer = Producer({
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'batch.size': 65536,
    'linger.ms': 5,
    'retries': 2147483647,
    'delivery.timeout.ms': 120000,
})

def delivery_report(err, msg):
    if err is not None:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce(
    'user-events',
    key=str(user_id),
    value=json.dumps(event).encode('utf-8'),
    callback=delivery_report
)
producer.flush()  # дожидаемся доставки всех pending сообщений

# Консьюмер
consumer = Consumer({
    'bootstrap.servers': 'kafka-1:9092',
    'group.id': 'web-app-consumer',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 30000,
})

consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError.PARTITION_EOF:
                continue
            raise KafkaException(msg.error())

        event = json.loads(msg.value().decode('utf-8'))
        process_event(event)
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Таймлайн

День 1 — проектирование: определяем топики, ключи сообщений (для гарантии порядка), формат сообщений (Avro/JSON/Protobuf), группы консьюмеров.

День 2 — разработка продюсеров: сериализация, настройка acks/idempotence, интеграция с бизнес-логикой приложения.

День 3 — разработка консьюмеров: ручное управление офсетами, обработка rebalance, Dead Letter Queue для ошибочных сообщений.

День 4–5 — тестирование: unit-тесты с EmbeddedKafka или Testcontainers, интеграционные тесты, нагрузочный тест с kafka-producer-perf-test, проверка exactly-once семантики.

День 6 — деплой, настройка мониторинга consumer lag, алерты.