Настройка Apache Kafka как очереди сообщений
Kafka — распределённая платформа потоковой обработки событий. В отличие от RabbitMQ, сообщения в Kafka не удаляются после обработки — они хранятся в топике по retention-политике (например, 7 дней или 100 GB). Несколько consumer group могут независимо читать один топик с разными смещениями.
Когда Kafka, когда RabbitMQ
Kafka предпочтительнее для: event sourcing, аналитических pipeline, аудит-логов, потоковой обработки с replay. RabbitMQ — для task queue (email, SMS, PDF-генерация) с гарантированной однократной доставкой.
Установка через Docker
# docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zookeeper_data:/var/lib/zookeeper/data
- zookeeper_log:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 168 # 7 дней
KAFKA_LOG_RETENTION_BYTES: 107374182400 # 100 GB
KAFKA_NUM_PARTITIONS: 6
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
volumes:
- kafka_data:/var/lib/kafka/data
ports:
- "9092:9092"
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on: [kafka]
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
ports:
- "8080:8080"
volumes:
zookeeper_data:
zookeeper_log:
kafka_data:
Создание топиков
# Создать топик с 6 партициями и репликой 3 (для кластера)
kafka-topics.sh --bootstrap-server kafka:9092 \
--create \
--topic user-events \
--partitions 6 \
--replication-factor 1 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
# Просмотр
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic user-events
PHP: rdkafka producer
use RdKafka\Producer;
use RdKafka\Conf;
class KafkaProducer
{
private Producer $producer;
public function __construct()
{
$conf = new Conf();
$conf->set('bootstrap.servers', config('kafka.brokers'));
$conf->set('security.protocol', 'PLAINTEXT');
$conf->set('acks', 'all'); // подтверждение от всех реплик
$conf->set('retries', '3');
$conf->set('enable.idempotence', 'true'); // ровно одна запись
$conf->set('compression.type', 'snappy');
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
Log::error('Kafka delivery failed', [
'error' => $message->errstr(),
'topic' => $message->topic_name,
]);
}
});
$this->producer = new Producer($conf);
}
public function publish(string $topic, string $key, array $payload): void
{
$rdTopic = $this->producer->newTopic($topic);
$rdTopic->produce(
partition: RD_KAFKA_PARTITION_UA, // автовыбор партиции по key
msgflags: 0,
payload: json_encode($payload),
key: $key, // один ключ → одна партиция → порядок событий
);
$this->producer->poll(0);
}
public function flush(): void
{
$result = $this->producer->flush(10000); // 10 секунд таймаут
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Kafka flush failed: ' . rd_kafka_err2str($result));
}
}
}
// Использование
$producer->publish('user-events', (string) $user->id, [
'event' => 'user.registered',
'user_id' => $user->id,
'email' => $user->email,
'timestamp' => now()->toIso8601String(),
]);
$producer->flush();
PHP: Consumer
use RdKafka\KafkaConsumer;
use RdKafka\Conf;
class UserEventConsumer
{
public function run(): void
{
$conf = new Conf();
$conf->set('group.id', 'user-events-analytics');
$conf->set('bootstrap.servers', config('kafka.brokers'));
$conf->set('enable.auto.commit', 'false'); // ручной commit
$conf->set('auto.offset.reset', 'earliest');
$conf->set('session.timeout.ms', '45000');
$conf->set('max.poll.interval.ms', '300000');
$conf->setRebalanceCb(function ($kafka, $err, $partitions) {
if ($err === RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
$kafka->assign($partitions);
Log::info('Partitions assigned', ['count' => count($partitions)]);
} else {
$kafka->assign(null);
}
});
$consumer = new KafkaConsumer($conf);
$consumer->subscribe(['user-events', 'order-events']);
while (true) {
$message = $consumer->consume(timeout_ms: 1000);
if ($message === null) continue;
if ($message->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) continue;
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
Log::error('Kafka consume error', ['error' => $message->errstr()]);
continue;
}
try {
$payload = json_decode($message->payload, true);
$this->handle($message->topic_name, $payload);
$consumer->commit($message); // commit только после успешной обработки
} catch (\Throwable $e) {
Log::error('Processing failed', [
'topic' => $message->topic_name,
'offset' => $message->offset,
'error' => $e->getMessage(),
]);
// Не делаем commit — сообщение будет перечитано
}
}
}
private function handle(string $topic, array $payload): void
{
match ($payload['event']) {
'user.registered' => $this->onUserRegistered($payload),
'user.deleted' => $this->onUserDeleted($payload),
default => Log::debug('Unknown event', ['event' => $payload['event']]),
};
}
}
Node.js: kafkajs
import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({
clientId: 'myapp-api',
brokers: [process.env.KAFKA_BROKERS!],
retry: {
retries: 5,
initialRetryTime: 300,
factor: 0.2,
},
});
// Producer
const producer = kafka.producer({
allowAutoTopicCreation: false,
idempotent: true,
maxInFlightRequests: 5,
});
await producer.connect();
await producer.send({
topic: 'user-events',
compression: CompressionTypes.Snappy,
messages: [{
key: String(userId),
value: JSON.stringify({ event: 'user.login', userId, ip, timestamp: Date.now() }),
headers: { 'content-type': 'application/json' },
}],
});
// Consumer
const consumer = kafka.consumer({ groupId: 'audit-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const payload = JSON.parse(message.value!.toString());
await AuditLog.create({
event: payload.event,
userId: payload.userId,
metadata: payload,
});
},
});
Kafka Streams для агрегации
// Java: подсчёт событий за последние 5 минут
StreamsBuilder builder = new StreamsBuilder();
KStream<String, UserEvent> events = builder.stream("user-events");
events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("user-activity-counts");
Schema Registry для контракта данных
# Avro схема для user-events
{
"type": "record",
"name": "UserEvent",
"namespace": "com.myapp.events",
"fields": [
{"name": "event", "type": "string"},
{"name": "user_id", "type": "long"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}
Schema Registry предотвращает несовместимые изменения формата сообщений между producer и consumer.
Мониторинг
# Lag consumer group (отставание от конца топика)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group user-events-analytics \
--describe
# Grafana: kafka_consumer_group_lag > 10000 → алерт
# JMX Exporter → Prometheus → Grafana дашборд
Срок реализации
| Задача | Срок |
|---|---|
| Kafka + базовый producer/consumer PHP/Node.js | 3–4 дня |
| Schema Registry + Avro | +2 дня |
| Kafka Streams агрегация | 3–5 дней |
| 3-нодовый кластер Kafka в Kubernetes | 4–5 дней |







