Настройка обмена данными через очереди сообщений (RabbitMQ) 1С-Битрикс
Синхронная передача данных между Битриксом и внешними системами прямо в обработчике события — ненадёжна. Если внешняя система недоступна, пользователь получает ошибку или событие теряется. RabbitMQ решает задачу: Битрикс публикует сообщение в очередь и немедленно возвращает управление, отдельный воркер забирает сообщение и доставляет в целевую систему.
Когда нужен RabbitMQ
- Внешняя система периодически недоступна (обслуживание, нестабильный канал).
- Объём событий нерегулярный: в пике — тысячи в минуту, в обычное время — десятки.
- Нужна гарантия доставки: каждое сообщение должно быть обработано ровно один раз.
- Несколько потребителей должны получать копии одного события (fan-out).
Публикация сообщений из Битрикса
Для работы с RabbitMQ из PHP — библиотека php-amqplib/php-amqplib. Устанавливаем через Composer в /local/:
cd /local && composer require php-amqplib/php-amqplib
Класс-издатель:
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQPublisher {
private static ?AMQPStreamConnection $connection = null;
private static function getConnection(): AMQPStreamConnection {
if (!self::$connection || !self::$connection->isConnected()) {
self::$connection = new AMQPStreamConnection(
COption::GetOptionString('site', 'rmq_host', 'localhost'),
COption::GetOptionInt('site', 'rmq_port', 5672),
COption::GetOptionString('site', 'rmq_user', 'guest'),
COption::GetOptionString('site', 'rmq_pass', 'guest'),
COption::GetOptionString('site', 'rmq_vhost', '/')
);
}
return self::$connection;
}
public static function publish(string $exchange, string $routingKey, array $payload): void {
$channel = self::getConnection()->channel();
$channel->exchange_declare($exchange, 'topic', false, true, false);
$msg = new AMQPMessage(
json_encode($payload),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // сообщение не теряется при рестарте
'content_type' => 'application/json']
);
$channel->basic_publish($msg, $exchange, $routingKey);
$channel->close();
}
}
Публикация при событиях Битрикса
// В init.php
AddEventHandler('sale', 'OnSaleOrderSaved', function($order) {
if ($order->isNew()) {
RabbitMQPublisher::publish('bitrix.events', 'order.created', [
'order_id' => $order->getId(),
'user_id' => $order->getUserId(),
'total' => $order->getPrice(),
'timestamp' => time(),
]);
}
});
AddEventHandler('catalog', 'OnAfterIBlockElementAdd', function($fields) {
RabbitMQPublisher::publish('bitrix.events', 'product.created', [
'element_id' => $fields['ID'],
'iblock_id' => $fields['IBLOCK_ID'],
'name' => $fields['NAME'],
]);
});
Воркер-потребитель
Воркер — отдельный PHP-процесс (или несколько), запущенный через Supervisor:
// worker.php
require '/local/vendor/autoload.php';
require $_SERVER['DOCUMENT_ROOT'] . '/bitrix/modules/main/include/prolog_before.php';
$connection = new AMQPStreamConnection(/* параметры */);
$channel = $connection->channel();
$channel->queue_declare('order.processor', false, true, false, false);
$channel->queue_bind('order.processor', 'bitrix.events', 'order.created');
$channel->basic_qos(null, 5, null); // не более 5 необработанных сообщений на воркера
$channel->basic_consume('order.processor', '', false, false, false, false,
function($msg) {
$data = json_decode($msg->getBody(), true);
try {
OrderSyncHandler::process($data);
$msg->ack(); // подтверждаем успешную обработку
} catch (\Throwable $e) {
$msg->nack(false, true); // возвращаем в очередь для повторной попытки
}
}
);
while ($channel->is_consuming()) {
$channel->wait();
}
Supervisor-конфиг (/etc/supervisor/conf.d/bitrix_worker.conf):
[program:bitrix_order_worker]
command=php /var/www/bitrix.loc/local/workers/order_worker.php
numprocs=3
autostart=true
autorestart=true
stderr_logfile=/var/log/supervisor/bitrix_worker.err.log
Dead Letter Queue
Сообщения, которые не удалось обработать N раз, перемещаются в DLQ (Dead Letter Queue) для ручного разбора. Настраивается при объявлении очереди:
$channel->queue_declare('order.processor', false, true, false, false, false, [
'x-dead-letter-exchange' => ['S', 'bitrix.dlx'],
'x-dead-letter-routing-key' => ['S', 'order.failed'],
'x-message-ttl' => ['I', 3600000], // 1 час TTL
]);
Мониторинг DLQ — через Management UI RabbitMQ (порт 15672) или через оповещения при росте очереди.
Настройка RabbitMQ для одного типа событий с воркером — 1–2 рабочих дня, включая Supervisor и мониторинг.







