Настройка очередей сообщений (Apache Kafka) для веб-приложения

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка очередей сообщений (Apache Kafka) для веб-приложения
Сложная
~5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка Apache Kafka как очереди сообщений

Kafka — распределённая платформа потоковой обработки событий. В отличие от RabbitMQ, сообщения в Kafka не удаляются после обработки — они хранятся в топике по retention-политике (например, 7 дней или 100 GB). Несколько consumer group могут независимо читать один топик с разными смещениями.

Когда Kafka, когда RabbitMQ

Kafka предпочтительнее для: event sourcing, аналитических pipeline, аудит-логов, потоковой обработки с replay. RabbitMQ — для task queue (email, SMS, PDF-генерация) с гарантированной однократной доставкой.

Установка через Docker

# docker-compose.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper_data:/var/lib/zookeeper/data
      - zookeeper_log:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 168        # 7 дней
      KAFKA_LOG_RETENTION_BYTES: 107374182400  # 100 GB
      KAFKA_NUM_PARTITIONS: 6
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
    volumes:
      - kafka_data:/var/lib/kafka/data
    ports:
      - "9092:9092"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on: [kafka]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    ports:
      - "8080:8080"

volumes:
  zookeeper_data:
  zookeeper_log:
  kafka_data:

Создание топиков

# Создать топик с 6 партициями и репликой 3 (для кластера)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic user-events \
  --partitions 6 \
  --replication-factor 1 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

# Просмотр
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic user-events

PHP: rdkafka producer

use RdKafka\Producer;
use RdKafka\Conf;

class KafkaProducer
{
    private Producer $producer;

    public function __construct()
    {
        $conf = new Conf();
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('security.protocol', 'PLAINTEXT');
        $conf->set('acks', 'all');              // подтверждение от всех реплик
        $conf->set('retries', '3');
        $conf->set('enable.idempotence', 'true'); // ровно одна запись
        $conf->set('compression.type', 'snappy');

        $conf->setDrMsgCb(function ($kafka, $message) {
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                Log::error('Kafka delivery failed', [
                    'error' => $message->errstr(),
                    'topic' => $message->topic_name,
                ]);
            }
        });

        $this->producer = new Producer($conf);
    }

    public function publish(string $topic, string $key, array $payload): void
    {
        $rdTopic = $this->producer->newTopic($topic);
        $rdTopic->produce(
            partition: RD_KAFKA_PARTITION_UA,  // автовыбор партиции по key
            msgflags: 0,
            payload: json_encode($payload),
            key: $key,                          // один ключ → одна партиция → порядок событий
        );
        $this->producer->poll(0);
    }

    public function flush(): void
    {
        $result = $this->producer->flush(10000);  // 10 секунд таймаут
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Kafka flush failed: ' . rd_kafka_err2str($result));
        }
    }
}

// Использование
$producer->publish('user-events', (string) $user->id, [
    'event'     => 'user.registered',
    'user_id'   => $user->id,
    'email'     => $user->email,
    'timestamp' => now()->toIso8601String(),
]);
$producer->flush();

PHP: Consumer

use RdKafka\KafkaConsumer;
use RdKafka\Conf;

class UserEventConsumer
{
    public function run(): void
    {
        $conf = new Conf();
        $conf->set('group.id', 'user-events-analytics');
        $conf->set('bootstrap.servers', config('kafka.brokers'));
        $conf->set('enable.auto.commit', 'false');  // ручной commit
        $conf->set('auto.offset.reset', 'earliest');
        $conf->set('session.timeout.ms', '45000');
        $conf->set('max.poll.interval.ms', '300000');

        $conf->setRebalanceCb(function ($kafka, $err, $partitions) {
            if ($err === RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
                $kafka->assign($partitions);
                Log::info('Partitions assigned', ['count' => count($partitions)]);
            } else {
                $kafka->assign(null);
            }
        });

        $consumer = new KafkaConsumer($conf);
        $consumer->subscribe(['user-events', 'order-events']);

        while (true) {
            $message = $consumer->consume(timeout_ms: 1000);

            if ($message === null) continue;
            if ($message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) continue;
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                Log::error('Kafka consume error', ['error' => $message->errstr()]);
                continue;
            }

            try {
                $payload = json_decode($message->payload, true);
                $this->handle($message->topic_name, $payload);
                $consumer->commit($message);  // commit только после успешной обработки
            } catch (\Throwable $e) {
                Log::error('Processing failed', [
                    'topic'  => $message->topic_name,
                    'offset' => $message->offset,
                    'error'  => $e->getMessage(),
                ]);
                // Не делаем commit — сообщение будет перечитано
            }
        }
    }

    private function handle(string $topic, array $payload): void
    {
        match ($payload['event']) {
            'user.registered' => $this->onUserRegistered($payload),
            'user.deleted'    => $this->onUserDeleted($payload),
            default           => Log::debug('Unknown event', ['event' => $payload['event']]),
        };
    }
}

Node.js: kafkajs

import { Kafka, CompressionTypes } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'myapp-api',
  brokers: [process.env.KAFKA_BROKERS!],
  retry: {
    retries: 5,
    initialRetryTime: 300,
    factor: 0.2,
  },
});

// Producer
const producer = kafka.producer({
  allowAutoTopicCreation: false,
  idempotent: true,
  maxInFlightRequests: 5,
});

await producer.connect();

await producer.send({
  topic: 'user-events',
  compression: CompressionTypes.Snappy,
  messages: [{
    key: String(userId),
    value: JSON.stringify({ event: 'user.login', userId, ip, timestamp: Date.now() }),
    headers: { 'content-type': 'application/json' },
  }],
});

// Consumer
const consumer = kafka.consumer({ groupId: 'audit-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const payload = JSON.parse(message.value!.toString());

    await AuditLog.create({
      event: payload.event,
      userId: payload.userId,
      metadata: payload,
    });
  },
});

Kafka Streams для агрегации

// Java: подсчёт событий за последние 5 минут
StreamsBuilder builder = new StreamsBuilder();

KStream<String, UserEvent> events = builder.stream("user-events");

events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count()
    .toStream()
    .to("user-activity-counts");

Schema Registry для контракта данных

# Avro схема для user-events
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.myapp.events",
  "fields": [
    {"name": "event", "type": "string"},
    {"name": "user_id", "type": "long"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Schema Registry предотвращает несовместимые изменения формата сообщений между producer и consumer.

Мониторинг

# Lag consumer group (отставание от конца топика)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group user-events-analytics \
  --describe

# Grafana: kafka_consumer_group_lag > 10000 → алерт
# JMX Exporter → Prometheus → Grafana дашборд

Срок реализации

Задача Срок
Kafka + базовый producer/consumer PHP/Node.js 3–4 дня
Schema Registry + Avro +2 дня
Kafka Streams агрегация 3–5 дней
3-нодовый кластер Kafka в Kubernetes 4–5 дней