Настройка очередей сообщений (Redis Pub/Sub) для веб-приложения

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка очередей сообщений (Redis Pub/Sub) для веб-приложения
Средняя
~2-3 рабочих дня
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка Redis Pub/Sub и Streams как очереди сообщений

Redis предоставляет два механизма для асинхронных сообщений: Pub/Sub — простой fire-and-forget без персистентности, и Streams — персистентная очередь с группами потребителей, похожая на облегчённый Kafka.

Redis Pub/Sub

Подходит для real-time уведомлений внутри приложения. Сообщения не сохраняются — если подписчик отключён, сообщение теряется.

// Laravel: публикация через Redis Pub/Sub
use Illuminate\Support\Facades\Redis;

// Publisher
Redis::publish('user-notifications', json_encode([
    'user_id' => $userId,
    'type'    => 'order.shipped',
    'message' => 'Ваш заказ отправлен',
]));

// Subscriber (console command)
class RedisSubscribeCommand extends Command
{
    protected $signature = 'redis:subscribe';

    public function handle(): void
    {
        Redis::subscribe(['user-notifications'], function (string $message) {
            $data = json_decode($message, true);
            broadcast(new UserNotificationEvent($data));  // → WebSocket
        });
    }
}

Redis Streams

Streams — правильный выбор для task queue на Redis. Сообщения хранятся в потоке, consumer groups отслеживают прогресс, pending entries — необработанные сообщения.

# Создать поток и добавить сообщение
XADD emails * user_id 123 email [email protected] template welcome

# Создать consumer group
XGROUP CREATE emails email-workers $ MKSTREAM

# Читать новые сообщения (воркер 1)
XREADGROUP GROUP email-workers worker-1 COUNT 10 BLOCK 5000 STREAMS emails >

# Подтвердить обработку
XACK emails email-workers <message-id>

PHP: Redis Streams воркер

use Illuminate\Support\Facades\Redis;

class RedisStreamWorker
{
    private string $stream = 'emails';
    private string $group = 'email-workers';
    private string $consumer;

    public function __construct()
    {
        $this->consumer = gethostname() . ':' . getmypid();
        $this->ensureGroup();
    }

    private function ensureGroup(): void
    {
        try {
            Redis::xgroup('CREATE', $this->stream, $this->group, '$', true);
        } catch (\Throwable) {
            // Группа уже существует
        }
    }

    public function run(): void
    {
        while (true) {
            // Сначала обработать pending (не подтверждённые с прошлого запуска)
            $pending = Redis::xreadgroup(
                $this->group, $this->consumer,
                [$this->stream => '0'],  // '0' = pending messages
                10
            );
            $this->processMessages($pending);

            // Затем новые сообщения
            $messages = Redis::xreadgroup(
                $this->group, $this->consumer,
                [$this->stream => '>'],  // '>' = only new
                10,
                5000  // блокировка 5 секунд
            );
            $this->processMessages($messages);
        }
    }

    private function processMessages(?array $streams): void
    {
        if (!$streams) return;

        foreach ($streams[$this->stream] ?? [] as [$id, $fields]) {
            try {
                $this->handleEmail($fields);
                Redis::xack($this->stream, $this->group, $id);
            } catch (\Throwable $e) {
                Log::error('Stream message failed', ['id' => $id, 'error' => $e->getMessage()]);
                // Сообщение остаётся в pending — будет перечитано при следующем запуске
            }
        }
    }

    private function handleEmail(array $fields): void
    {
        Mail::to($fields['email'])->send(new TemplateMail($fields['template'], $fields));
    }
}

Node.js: ioredis Streams

import Redis from 'ioredis';

const redis = new Redis({ host: 'redis', port: 6379 });
const STREAM = 'emails';
const GROUP = 'email-workers';
const CONSUMER = `worker-${process.pid}`;

async function startWorker(): Promise<void> {
  // Создать группу если не существует
  try {
    await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
  } catch { /* group exists */ }

  while (true) {
    const messages = await redis.xreadgroup(
      'GROUP', GROUP, CONSUMER,
      'COUNT', '10',
      'BLOCK', '5000',
      'STREAMS', STREAM, '>'
    ) as [string, [string, string[]][]][] | null;

    if (!messages) continue;

    for (const [, entries] of messages) {
      for (const [id, fields] of entries) {
        const data = Object.fromEntries(
          fields.reduce((acc, val, i) => (i % 2 === 0 ? acc.push([val, fields[i+1]]) : acc, acc), [] as [string,string][])
        );

        try {
          await sendEmail(data);
          await redis.xack(STREAM, GROUP, id);
        } catch (err) {
          console.error('Email failed:', id, err);
        }
      }
    }
  }
}

Trimming и очистка потока

# Обрезать поток до 10000 последних сообщений
XTRIM emails MAXLEN ~ 10000

# Автоматически при добавлении
XADD emails MAXLEN ~ 100000 * user_id 123 template welcome

Сравнение Redis механизмов

Характеристика Pub/Sub Streams Lists (LPUSH/BRPOP)
Персистентность Нет Да Да
Consumer groups Нет Да Нет
Replay истории Нет Да Нет
Сложность Минимальная Средняя Минимальная
Применение Real-time events Task queue Simple queue

Срок реализации

Redis Streams воркер для типичного PHP/Node.js приложения (email, уведомления): 1–2 дня. С мониторингом pending messages и алертами: 2–3 дня.