Настройка Exchange и Queue RabbitMQ (Direct, Fanout, Topic)
RabbitMQ не доставляет сообщения напрямую в очереди — между продюсером и очередью стоит exchange. Тип exchange определяет, куда пойдёт сообщение. Правильный выбор типа сокращает сложность маршрутизации в коде.
Четыре типа exchange
Direct — точное совпадение routing key. Сообщение с key order.created идёт в очереди, привязанные с тем же ключом. Один-к-одному или один-ко-многим (несколько очередей с одним ключом).
Fanout — routing key игнорируется, сообщение рассылается во все привязанные очереди. Broadcast: событие «пользователь авторизовался» нужно получить и логгеру, и сервису сессий, и аналитике.
Topic — routing key с wildcards. * — одно слово, # — ноль или более слов. order.*.created совпадёт с order.express.created и order.regular.created. order.# совпадёт с любым ключом, начинающимся с order..
Headers — маршрутизация по заголовкам AMQP, routing key игнорируется. Используется редко, когда маршрут определяется несколькими атрибутами.
Создание через CLI
# Direct exchange
rabbitmqadmin declare exchange \
name=orders \
type=direct \
durable=true \
auto_delete=false
# Fanout exchange
rabbitmqadmin declare exchange \
name=events-broadcast \
type=fanout \
durable=true
# Topic exchange
rabbitmqadmin declare exchange \
name=app-events \
type=topic \
durable=true \
arguments='{"alternate-exchange":"app-events-unrouted"}'
# Очереди
rabbitmqadmin declare queue \
name=order-processing \
durable=true \
arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"dlx","x-dead-letter-routing-key":"order-processing.failed","x-delivery-limit":5}'
rabbitmqadmin declare queue \
name=order-notifications \
durable=true \
arguments='{"x-queue-type":"quorum"}'
# Bindings
rabbitmqadmin declare binding \
source=orders \
destination=order-processing \
routing_key=order.created
rabbitmqadmin declare binding \
source=app-events \
destination=order-processing \
routing_key="order.#"
rabbitmqadmin declare binding \
source=app-events \
destination=order-notifications \
routing_key="order.*.created"
Создание через Management HTTP API
BASE="http://rabbit-1:15672/api"
AUTH="admin:password"
# Создаём topic exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/app-events" \
-H "Content-Type: application/json" \
-d '{
"type": "topic",
"durable": true,
"auto_delete": false,
"arguments": {
"alternate-exchange": "app-events-unrouted"
}
}'
# Dead Letter Exchange
curl -u $AUTH -X PUT "$BASE/exchanges/%2F/dlx" \
-H "Content-Type: application/json" \
-d '{"type": "direct", "durable": true}'
# DLQ очередь
curl -u $AUTH -X PUT "$BASE/queues/%2F/failed-messages" \
-H "Content-Type: application/json" \
-d '{
"durable": true,
"arguments": {
"x-queue-type": "quorum",
"x-message-ttl": 2592000000
}
}'
# Привязываем DLQ к DLX
curl -u $AUTH -X POST "$BASE/bindings/%2F/e/dlx/q/failed-messages" \
-H "Content-Type: application/json" \
-d '{"routing_key": "#"}'
PHP-продюсер через php-amqplib
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class EventPublisher
{
private AMQPLazyConnection $connection;
private ?\AMQPChannel $channel = null;
public function __construct(
private readonly array $hosts, // [['host'=>'rabbit-1','port'=>5672,'user'=>'...','password'=>'...']]
) {}
private function channel(): \AMQPChannel
{
if ($this->channel === null || !$this->channel->is_open()) {
$this->connection = AMQPLazyConnection::create_connection($this->hosts, [
'heartbeat' => 60,
'connection_timeout' => 5,
'read_write_timeout' => 10,
]);
$this->channel = $this->connection->channel();
// Подтверждение доставки
$this->channel->confirm_select();
}
return $this->channel;
}
public function publish(string $exchange, string $routingKey, array $payload): void
{
$channel = $this->channel();
$message = new AMQPMessage(
json_encode($payload, JSON_THROW_ON_ERROR),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json',
'message_id' => (string) Str::uuid(),
'timestamp' => time(),
'app_id' => 'webapp',
'headers' => new AMQPTable([
'x-retry-count' => 0,
'x-source' => 'api',
]),
]
);
$channel->basic_publish($message, $exchange, $routingKey);
// Ждём подтверждения от брокера
$channel->wait_for_pending_acks(5.0);
}
}
// Использование
$publisher->publish('app-events', 'order.express.created', [
'order_id' => 12345,
'user_id' => 67890,
'amount' => 1999.99,
]);
PHP-консьюмер
class OrderConsumer
{
public function consume(): void
{
$channel = $this->connection->channel();
$channel->basic_qos(null, 20, null); // prefetch: не более 20 сообщений без ack
$channel->basic_consume(
'order-processing',
'', // consumer tag (auto-generated)
false, // no_local
false, // no_ack — ручное подтверждение
false, // exclusive
false, // nowait
function (AMQPMessage $message) {
try {
$payload = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
$this->processOrder($payload);
$message->ack();
} catch (\Throwable $e) {
// Requeue=false → сообщение пойдёт в DLX по настройке очереди
$message->nack(false);
Log::error('Failed to process order', ['error' => $e->getMessage()]);
}
}
);
while ($channel->is_consuming()) {
$channel->wait(null, false, 5.0); // timeout 5s для graceful shutdown
if ($this->shouldStop()) break;
}
$channel->close();
}
}
Python-клиент через pika
import pika
import json
credentials = pika.PlainCredentials('webapp', 'password')
parameters = pika.ConnectionParameters(
host='rabbit-1',
port=5672,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=30,
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# Публикация в topic exchange
channel.basic_publish(
exchange='app-events',
routing_key='user.premium.upgraded',
body=json.dumps({'user_id': 42, 'plan': 'premium'}),
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent,
content_type='application/json',
)
)
connection.close()
Типичные паттерны маршрутизации
Work Queue — несколько воркеров читают из одной очереди, RabbitMQ распределяет round-robin. Direct exchange, один binding.
Pub/Sub — одно событие получают все подписчики. Fanout exchange, каждый сервис имеет свою очередь.
Routing — разные события идут в разные очереди. Direct или topic exchange.
RPC — запрос-ответ через очереди. Продюсер создаёт временную очередь, указывает её в reply_to, консьюмер отвечает туда.
Таймлайн
День 1 — проектирование схемы exchanges/queues/bindings под бизнес-логику приложения. Создание через Management UI или CLI.
День 2 — интеграция продюсеров, настройка confirm mode, graceful reconnect при потере соединения. Интеграция консьюмеров с ручным ack.
День 3 — тестирование маршрутизации, проверка DLX, нагрузочный тест с rabbitmq-perf-test.







