Настройка Exchange и Queue RabbitMQ (Direct, Fanout, Topic)

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка Exchange и Queue RabbitMQ (Direct, Fanout, Topic)
Средняя
~2-3 рабочих дня
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка Exchange и Queue RabbitMQ (Direct, Fanout, Topic)

RabbitMQ не доставляет сообщения напрямую в очереди — между продюсером и очередью стоит exchange. Тип exchange определяет, куда пойдёт сообщение. Правильный выбор типа сокращает сложность маршрутизации в коде.

Четыре типа exchange

Direct — точное совпадение routing key. Сообщение с key order.created идёт в очереди, привязанные с тем же ключом. Один-к-одному или один-ко-многим (несколько очередей с одним ключом).

Fanout — routing key игнорируется, сообщение рассылается во все привязанные очереди. Broadcast: событие «пользователь авторизовался» нужно получить и логгеру, и сервису сессий, и аналитике.

Topic — routing key с wildcards. * — одно слово, # — ноль или более слов. order.*.created совпадёт с order.express.created и order.regular.created. order.# совпадёт с любым ключом, начинающимся с order..

Headers — маршрутизация по заголовкам AMQP, routing key игнорируется. Используется редко, когда маршрут определяется несколькими атрибутами.

Создание через CLI

# Direct exchange
rabbitmqadmin declare exchange \
    name=orders \
    type=direct \
    durable=true \
    auto_delete=false

# Fanout exchange
rabbitmqadmin declare exchange \
    name=events-broadcast \
    type=fanout \
    durable=true

# Topic exchange
rabbitmqadmin declare exchange \
    name=app-events \
    type=topic \
    durable=true \
    arguments='{"alternate-exchange":"app-events-unrouted"}'

# Очереди
rabbitmqadmin declare queue \
    name=order-processing \
    durable=true \
    arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"order-processing.failed","x-delivery-limit":5}'

rabbitmqadmin declare queue \
    name=order-notifications \
    durable=true \
    arguments='{"x-queue-type":"quorum"}'

# Bindings
rabbitmqadmin declare binding \
    source=orders \
    destination=order-processing \
    routing_key=order.created

rabbitmqadmin declare binding \
    source=app-events \
    destination=order-processing \
    routing_key="order.#"

rabbitmqadmin declare binding \
    source=app-events \
    destination=order-notifications \
    routing_key="order.*.created"

Создание через Management HTTP API

BASE="http://rabbit-1:15672/api"
AUTH="admin:password"

# Создаём topic exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/app-events" \
  -H "Content-Type: application/json" \
  -d '{
    "type": "topic",
    "durable": true,
    "auto_delete": false,
    "arguments": {
      "alternate-exchange": "app-events-unrouted"
    }
  }'

# Dead Letter Exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/dlx" \
  -H "Content-Type: application/json" \
  -d '{"type": "direct", "durable": true}'

# DLQ очередь
curl -u $AUTH -X PUT "$BASE/queues/%2F/failed-messages" \
  -H "Content-Type: application/json" \
  -d '{
    "durable": true,
    "arguments": {
      "x-queue-type": "quorum",
      "x-message-ttl": 2592000000
    }
  }'

# Привязываем DLQ к DLX
curl -u $AUTH -X POST "$BASE/bindings/%2F/e/dlx/q/failed-messages" \
  -H "Content-Type: application/json" \
  -d '{"routing_key": "#"}'

PHP-продюсер через php-amqplib

use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class EventPublisher
{
    private AMQPLazyConnection $connection;
    private ?\AMQPChannel $channel = null;

    public function __construct(
        private readonly array $hosts, // [['host'=>'rabbit-1','port'=>5672,'user'=>'...','password'=>'...']]
    ) {}

    private function channel(): \AMQPChannel
    {
        if ($this->channel === null || !$this->channel->is_open()) {
            $this->connection = AMQPLazyConnection::create_connection($this->hosts, [
                'heartbeat' => 60,
                'connection_timeout' => 5,
                'read_write_timeout' => 10,
            ]);
            $this->channel = $this->connection->channel();
            // Подтверждение доставки
            $this->channel->confirm_select();
        }
        return $this->channel;
    }

    public function publish(string $exchange, string $routingKey, array $payload): void
    {
        $channel = $this->channel();

        $message = new AMQPMessage(
            json_encode($payload, JSON_THROW_ON_ERROR),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'content_type'  => 'application/json',
                'message_id'    => (string) Str::uuid(),
                'timestamp'     => time(),
                'app_id'        => 'webapp',
                'headers'       => new AMQPTable([
                    'x-retry-count' => 0,
                    'x-source'      => 'api',
                ]),
            ]
        );

        $channel->basic_publish($message, $exchange, $routingKey);

        // Ждём подтверждения от брокера
        $channel->wait_for_pending_acks(5.0);
    }
}

// Использование
$publisher->publish('app-events', 'order.express.created', [
    'order_id' => 12345,
    'user_id'  => 67890,
    'amount'   => 1999.99,
]);

PHP-консьюмер

class OrderConsumer
{
    public function consume(): void
    {
        $channel = $this->connection->channel();
        $channel->basic_qos(null, 20, null); // prefetch: не более 20 сообщений без ack

        $channel->basic_consume(
            'order-processing',
            '',     // consumer tag (auto-generated)
            false,  // no_local
            false,  // no_ack — ручное подтверждение
            false,  // exclusive
            false,  // nowait
            function (AMQPMessage $message) {
                try {
                    $payload = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
                    $this->processOrder($payload);
                    $message->ack();
                } catch (\Throwable $e) {
                    // Requeue=false → сообщение пойдёт в DLX по настройке очереди
                    $message->nack(false);
                    Log::error('Failed to process order', ['error' => $e->getMessage()]);
                }
            }
        );

        while ($channel->is_consuming()) {
            $channel->wait(null, false, 5.0); // timeout 5s для graceful shutdown
            if ($this->shouldStop()) break;
        }

        $channel->close();
    }
}

Python-клиент через pika

import pika
import json

credentials = pika.PlainCredentials('webapp', 'password')
parameters = pika.ConnectionParameters(
    host='rabbit-1',
    port=5672,
    credentials=credentials,
    heartbeat=60,
    blocked_connection_timeout=30,
)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Публикация в topic exchange
channel.basic_publish(
    exchange='app-events',
    routing_key='user.premium.upgraded',
    body=json.dumps({'user_id': 42, 'plan': 'premium'}),
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent,
        content_type='application/json',
    )
)
connection.close()

Типичные паттерны маршрутизации

Work Queue — несколько воркеров читают из одной очереди, RabbitMQ распределяет round-robin. Direct exchange, один binding.

Pub/Sub — одно событие получают все подписчики. Fanout exchange, каждый сервис имеет свою очередь.

Routing — разные события идут в разные очереди. Direct или topic exchange.

RPC — запрос-ответ через очереди. Продюсер создаёт временную очередь, указывает её в reply_to, консьюмер отвечает туда.

Таймлайн

День 1 — проектирование схемы exchanges/queues/bindings под бизнес-логику приложения. Создание через Management UI или CLI.

День 2 — интеграция продюсеров, настройка confirm mode, graceful reconnect при потере соединения. Интеграция консьюмеров с ручным ack.

День 3 — тестирование маршрутизации, проверка DLX, нагрузочный тест с rabbitmq-perf-test.