Настройка Redis Pub/Sub и Streams как очереди сообщений
Redis предоставляет два механизма для асинхронных сообщений: Pub/Sub — простой fire-and-forget без персистентности, и Streams — персистентная очередь с группами потребителей, похожая на облегчённый Kafka.
Redis Pub/Sub
Подходит для real-time уведомлений внутри приложения. Сообщения не сохраняются — если подписчик отключён, сообщение теряется.
// Laravel: публикация через Redis Pub/Sub
use Illuminate\Support\Facades\Redis;
// Publisher
Redis::publish('user-notifications', json_encode([
'user_id' => $userId,
'type' => 'order.shipped',
'message' => 'Ваш заказ отправлен',
]));
// Subscriber (console command)
class RedisSubscribeCommand extends Command
{
protected $signature = 'redis:subscribe';
public function handle(): void
{
Redis::subscribe(['user-notifications'], function (string $message) {
$data = json_decode($message, true);
broadcast(new UserNotificationEvent($data)); // → WebSocket
});
}
}
Redis Streams
Streams — правильный выбор для task queue на Redis. Сообщения хранятся в потоке, consumer groups отслеживают прогресс, pending entries — необработанные сообщения.
# Создать поток и добавить сообщение
XADD emails * user_id 123 email [email protected] template welcome
# Создать consumer group
XGROUP CREATE emails email-workers $ MKSTREAM
# Читать новые сообщения (воркер 1)
XREADGROUP GROUP email-workers worker-1 COUNT 10 BLOCK 5000 STREAMS emails >
# Подтвердить обработку
XACK emails email-workers <message-id>
PHP: Redis Streams воркер
use Illuminate\Support\Facades\Redis;
class RedisStreamWorker
{
private string $stream = 'emails';
private string $group = 'email-workers';
private string $consumer;
public function __construct()
{
$this->consumer = gethostname() . ':' . getmypid();
$this->ensureGroup();
}
private function ensureGroup(): void
{
try {
Redis::xgroup('CREATE', $this->stream, $this->group, '$', true);
} catch (\Throwable) {
// Группа уже существует
}
}
public function run(): void
{
while (true) {
// Сначала обработать pending (не подтверждённые с прошлого запуска)
$pending = Redis::xreadgroup(
$this->group, $this->consumer,
[$this->stream => '0'], // '0' = pending messages
10
);
$this->processMessages($pending);
// Затем новые сообщения
$messages = Redis::xreadgroup(
$this->group, $this->consumer,
[$this->stream => '>'], // '>' = only new
10,
5000 // блокировка 5 секунд
);
$this->processMessages($messages);
}
}
private function processMessages(?array $streams): void
{
if (!$streams) return;
foreach ($streams[$this->stream] ?? [] as [$id, $fields]) {
try {
$this->handleEmail($fields);
Redis::xack($this->stream, $this->group, $id);
} catch (\Throwable $e) {
Log::error('Stream message failed', ['id' => $id, 'error' => $e->getMessage()]);
// Сообщение остаётся в pending — будет перечитано при следующем запуске
}
}
}
private function handleEmail(array $fields): void
{
Mail::to($fields['email'])->send(new TemplateMail($fields['template'], $fields));
}
}
Node.js: ioredis Streams
import Redis from 'ioredis';
const redis = new Redis({ host: 'redis', port: 6379 });
const STREAM = 'emails';
const GROUP = 'email-workers';
const CONSUMER = `worker-${process.pid}`;
async function startWorker(): Promise<void> {
// Создать группу если не существует
try {
await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
} catch { /* group exists */ }
while (true) {
const messages = await redis.xreadgroup(
'GROUP', GROUP, CONSUMER,
'COUNT', '10',
'BLOCK', '5000',
'STREAMS', STREAM, '>'
) as [string, [string, string[]][]][] | null;
if (!messages) continue;
for (const [, entries] of messages) {
for (const [id, fields] of entries) {
const data = Object.fromEntries(
fields.reduce((acc, val, i) => (i % 2 === 0 ? acc.push([val, fields[i+1]]) : acc, acc), [] as [string,string][])
);
try {
await sendEmail(data);
await redis.xack(STREAM, GROUP, id);
} catch (err) {
console.error('Email failed:', id, err);
}
}
}
}
}
Trimming и очистка потока
# Обрезать поток до 10000 последних сообщений
XTRIM emails MAXLEN ~ 10000
# Автоматически при добавлении
XADD emails MAXLEN ~ 100000 * user_id 123 template welcome
Сравнение Redis механизмов
| Характеристика | Pub/Sub | Streams | Lists (LPUSH/BRPOP) |
|---|---|---|---|
| Персистентность | Нет | Да | Да |
| Consumer groups | Нет | Да | Нет |
| Replay истории | Нет | Да | Нет |
| Сложность | Минимальная | Средняя | Минимальная |
| Применение | Real-time events | Task queue | Simple queue |
Срок реализации
Redis Streams воркер для типичного PHP/Node.js приложения (email, уведомления): 1–2 дня. С мониторингом pending messages и алертами: 2–3 дня.







