Настройка обмена данными через Apache Kafka для 1С-Битрикс
Apache Kafka — распределённый лог событий, не традиционный брокер очередей. Если RabbitMQ подходит для задачи «передать сообщение из A в B с гарантией доставки», Kafka подходит для «хранить поток событий, позволить множеству потребителей читать его независимо и воспроизводить с любой точки». Для Битрикса Kafka имеет смысл в крупных проектах с несколькими потребителями событий.
Когда Kafka вместо RabbitMQ
- Поток событий должен обрабатываться несколькими независимыми системами одновременно (аналитика, CRM, склад, маркетинг).
- Нужно воспроизвести события за прошлый период (retention до нескольких дней/недель).
- Объём событий — сотни тысяч в минуту.
- Event sourcing: событие — это источник истины, из которого восстанавливается состояние.
Для типичного интернет-магазина с несколькими внешними системами и без требований к воспроизведению событий — RabbitMQ проще и дешевле.
Клиент Kafka для PHP
Официального PHP-клиента от Apache нет. Используем arnaud-lb/php-rdkafka (биндинги к librdkafka):
# Установка librdkafka (Ubuntu)
apt-get install librdkafka-dev
# Установка PHP-расширения
pecl install rdkafka
# PHP-обёртка
cd /local && composer require arnaud-lb/php-rdkafka
Producer: публикация событий из Битрикса
class KafkaProducer {
private \RdKafka\Producer $producer;
public function __construct() {
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list',
COption::GetOptionString('site', 'kafka_brokers', 'kafka:9092'));
$conf->set('security.protocol', 'PLAINTEXT');
// Для production с SSL:
// $conf->set('security.protocol', 'SSL');
// $conf->set('ssl.ca.location', '/etc/kafka/certs/ca-cert');
$this->producer = new \RdKafka\Producer($conf);
}
public function publish(string $topic, string $key, array $payload): void {
$topic = $this->producer->newTopic($topic);
$topic->produce(
\RD_KAFKA_PARTITION_UA, // автовыбор партиции
0,
json_encode($payload),
$key // ключ партиционирования — например, user_id для упорядоченности событий пользователя
);
$this->producer->flush(1000); // ждём 1 сек подтверждения
}
}
// Использование в обработчиках событий
AddEventHandler('sale', 'OnSaleOrderSaved', function($order) {
$kafka = new KafkaProducer();
$kafka->publish('bitrix.orders', (string)$order->getUserId(), [
'event' => $order->isNew() ? 'order.created' : 'order.updated',
'order_id' => $order->getId(),
'status' => $order->getField('STATUS_ID'),
'total' => $order->getPrice(),
'ts' => time(),
]);
});
Consumer: потребитель событий
Потребитель запускается как отдельный демон (не в контексте Битрикса — в контексте PHP-CLI):
// kafka_consumer.php
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'crm-sync-group');
$conf->set('metadata.broker.list', 'kafka:9092');
$conf->set('auto.offset.reset', 'latest'); // читать с конца, не с начала
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe(['bitrix.orders', 'bitrix.products']);
while (true) {
$message = $consumer->consume(5000); // timeout 5 сек
if ($message->err === \RD_KAFKA_RESP_ERR_NO_ERROR) {
$payload = json_decode($message->payload, true);
try {
EventDispatcher::dispatch($message->topic_name, $payload);
// Kafka сама управляет оффсетами при use group.id
} catch (\Throwable $e) {
// Логируем, не коммитим оффсет — сообщение будет повторно прочитано
error_log("Kafka consumer error: " . $e->getMessage());
}
}
}
Топики и партиции
| Топик | Ключ партиции | Потребители |
|---|---|---|
bitrix.orders |
user_id | CRM, склад, аналитика |
bitrix.products |
iblock_element_id | Поисковый индекс, рекомендации |
bitrix.users |
user_id | CDP, email-маркетинг |
bitrix.carts |
user_id | Аналитика брошенных корзин |
Количество партиций = максимальный параллелизм потребителей. Для старта — 3–6 партиций на топик.
Мониторинг Kafka
Lag потребителей — ключевая метрика: разница между последним опубликованным сообщением и последним прочитанным потребителем. Если lag растёт — потребитель не справляется. Мониторинг через Kafka UI (Provectus) или CMAK, оповещения в Telegram через alertmanager.
Настройка Kafka для Битрикса с двумя–тремя топиками и тестовым потребителем — 2–3 рабочих дня на инфраструктуру и 1 день на интеграцию с Битриксом.







