Настройка очередей сообщений (RabbitMQ) для веб-приложения

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка очередей сообщений (RabbitMQ) для веб-приложения
Сложная
~3-5 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка RabbitMQ как очереди сообщений

RabbitMQ — брокер сообщений на протоколе AMQP. Разделяет задачи между компонентами системы: веб-приложение публикует задачи (emails, PDF-генерация, уведомления), воркеры обрабатывают их асинхронно. При падении воркера сообщения остаются в очереди и будут обработаны после перезапуска.

Установка через Docker

# docker-compose.yml
services:
  rabbitmq:
    image: rabbitmq:3.13-management-alpine
    environment:
      RABBITMQ_DEFAULT_USER: myapp
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
      RABBITMQ_DEFAULT_VHOST: myapp
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  rabbitmq_data:

Ключевые концепции

Exchange — точка входа для публикации. Распределяет сообщения по очередям по правилам:

  • direct — точное совпадение routing key
  • topic — паттерн с * и #
  • fanout — всем подписанным очередям
  • headers — по заголовкам сообщения

Queue — буфер хранения сообщений до обработки воркером.

Binding — связь между exchange и queue с routing key.

Топология для веб-приложения

[App] → [myapp.exchange (topic)] → myapp.emails → [Email Worker]
                                 → myapp.notifications → [Push Worker]
                                 → myapp.reports → [Report Worker]

PHP: phpamqplib

// Публикация сообщения
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitMQPublisher
{
    private AMQPStreamConnection $connection;
    private \PhpAmqpLib\Channel\AMQPChannel $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            host: config('rabbitmq.host'),
            port: config('rabbitmq.port', 5672),
            user: config('rabbitmq.user'),
            password: config('rabbitmq.password'),
            vhost: config('rabbitmq.vhost', '/'),
        );
        $this->channel = $this->connection->channel();
        $this->setup();
    }

    private function setup(): void
    {
        // Durable exchange — переживает перезапуск брокера
        $this->channel->exchange_declare(
            exchange: 'myapp.exchange',
            type: 'topic',
            durable: true,
            auto_delete: false,
        );

        // Dead Letter Queue для неудачных сообщений
        $this->channel->queue_declare(
            queue: 'myapp.dlq',
            durable: true,
            arguments: new AMQPTable(['x-queue-type' => 'classic'])
        );

        // Основная очередь с привязкой к DLQ
        $this->channel->queue_declare(
            queue: 'myapp.emails',
            durable: true,
            arguments: new AMQPTable([
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => 'myapp.dlq',
                'x-message-ttl' => 86400000,  // 24 часа в мс
            ])
        );

        $this->channel->queue_bind('myapp.emails', 'myapp.exchange', 'emails.*');
        $this->channel->queue_bind('myapp.notifications', 'myapp.exchange', 'notifications.*');
    }

    public function publish(string $routingKey, array $payload): void
    {
        $message = new AMQPMessage(
            json_encode($payload),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'content_type'  => 'application/json',
                'message_id'    => Str::uuid()->toString(),
                'timestamp'     => time(),
            ]
        );

        $this->channel->basic_publish($message, 'myapp.exchange', $routingKey);
    }

    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// Использование
$publisher->publish('emails.welcome', [
    'user_id' => $user->id,
    'email'   => $user->email,
    'name'    => $user->name,
]);

PHP: Consumer воркер

class EmailWorker
{
    public function run(): void
    {
        $channel = $this->getChannel();

        // Prefetch: не брать более 1 сообщения без подтверждения
        $channel->basic_qos(prefetch_size: 0, prefetch_count: 1, global: false);

        $channel->basic_consume(
            queue: 'myapp.emails',
            consumer_tag: gethostname() . '.email',
            no_ack: false,  // ручное подтверждение обязательно
            callback: [$this, 'handleEmail'],
        );

        while ($channel->is_consuming()) {
            $channel->wait(timeout: 60);
        }
    }

    public function handleEmail(AMQPMessage $message): void
    {
        try {
            $payload = json_decode($message->getBody(), true);

            Mail::to($payload['email'])->send(new WelcomeMail($payload));

            // Подтверждение успешной обработки
            $message->ack();

            Log::info('Email sent', ['user_id' => $payload['user_id']]);

        } catch (\Throwable $e) {
            Log::error('Email failed', ['error' => $e->getMessage()]);

            // Requeue только если это первая попытка
            $requeue = !$message->has('application_headers') ||
                       ($message->get('application_headers')->getNativeData()['x-death'] ?? null) === null;

            $message->nack(requeue: $requeue);
        }
    }
}

Node.js: amqplib

import amqp, { Connection, Channel } from 'amqplib';

class MessageBus {
  private connection!: Connection;
  private channel!: Channel;

  async connect(): Promise<void> {
    this.connection = await amqp.connect({
      hostname: process.env.RABBITMQ_HOST,
      port: 5672,
      username: process.env.RABBITMQ_USER,
      password: process.env.RABBITMQ_PASS,
      vhost: process.env.RABBITMQ_VHOST,
      heartbeat: 60,
    });

    this.channel = await this.connection.createChannel();
    await this.channel.prefetch(5);

    await this.channel.assertExchange('myapp.exchange', 'topic', { durable: true });
    await this.channel.assertQueue('myapp.notifications', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'myapp.dlq',
      },
    });
    await this.channel.bindQueue('myapp.notifications', 'myapp.exchange', 'notifications.*');
  }

  async publish(routingKey: string, payload: object): Promise<void> {
    const content = Buffer.from(JSON.stringify(payload));
    this.channel.publish('myapp.exchange', routingKey, content, {
      persistent: true,
      contentType: 'application/json',
      messageId: crypto.randomUUID(),
      timestamp: Math.floor(Date.now() / 1000),
    });
  }

  async consume(queue: string, handler: (payload: unknown) => Promise<void>): Promise<void> {
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const payload = JSON.parse(msg.content.toString());
        await handler(payload);
        this.channel.ack(msg);
      } catch (err) {
        console.error('Message processing failed:', err);
        this.channel.nack(msg, false, false);  // отправить в DLQ
      }
    });
  }
}

Laravel Queue с RabbitMQ

Через пакет vladimir-yuldashev/laravel-queue-rabbitmq:

QUEUE_CONNECTION=rabbitmq
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_VHOST=myapp
RABBITMQ_LOGIN=myapp
RABBITMQ_PASSWORD=secret
RABBITMQ_QUEUE=myapp.jobs
// config/queue.php
'rabbitmq' => [
    'driver'   => 'rabbitmq',
    'queue'    => env('RABBITMQ_QUEUE', 'default'),
    'hosts'    => [[
        'host'     => env('RABBITMQ_HOST', '127.0.0.1'),
        'port'     => env('RABBITMQ_PORT', 5672),
        'user'     => env('RABBITMQ_LOGIN', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'vhost'    => env('RABBITMQ_VHOST', '/'),
    ]],
    'options' => [
        'queue' => [
            'exchange'      => 'myapp.exchange',
            'exchange_type' => 'topic',
            'exchange_routing_key' => 'jobs.*',
        ],
    ],
],
// Стандартный Laravel Job
class SendWelcomeEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3;
    public int $backoff = 60;

    public function __construct(private User $user) {}

    public function handle(): void
    {
        Mail::to($this->user)->send(new WelcomeMail($this->user));
    }
}

// Публикация
SendWelcomeEmail::dispatch($user)->onQueue('myapp.emails');

Мониторинг через Management API

# Количество сообщений в очереди
curl -s -u myapp:password \
  "http://rabbitmq:15672/api/queues/myapp/myapp.emails" | \
  jq '.messages, .consumers'

# Алерт: очередь растёт
# Grafana: rabbitmq_queue_messages > 1000 → Slack notification

Срок реализации

Задача Срок
RabbitMQ + базовый producer/consumer 2–3 дня
Laravel Queue интеграция 1–2 дня
Dead Letter Queue + мониторинг +1–2 дня
HA кластер RabbitMQ (3 ноды) 3–4 дня