Настройка Dead Letter Queue для обработки ошибок

Наша компания занимается разработкой, поддержкой и обслуживанием сайтов любой сложности. От простых одностраничных сайтов до масштабных кластерных систем построенных на микро сервисах. Опыт разработчиков подтвержден сертификатами от вендоров.
Разработка и обслуживание любых видов сайтов:
Информационные сайты или веб-приложения
Сайты визитки, landing page, корпоративные сайты, онлайн каталоги, квиз, промо-сайты, блоги, новостные ресурсы, информационные порталы, форумы, агрегаторы
Сайты или веб-приложения электронной коммерции
Интернет-магазины, B2B-порталы, маркетплейсы, онлайн-обменники, кэшбэк-сайты, биржи, дропшиппинг-платформы, парсеры товаров
Веб-приложения для управления бизнес-процессами
CRM-системы, ERP-системы, корпоративные порталы, системы управления производством, парсеры информации
Сайты или веб-приложения электронных услуг
Доски объявлений, онлайн-школы, онлайн-кинотеатры, конструкторы сайтов, порталы предоставления электронных услуг, видеохостинги, тематические порталы

Это лишь некоторые из технических типов сайтов, с которыми мы работаем, и каждый из них может иметь свои специфические особенности и функциональность, а также быть адаптированным под конкретные потребности и цели клиента

Предлагаемые услуги
Показано 1 из 1 услугВсе 2065 услуг
Настройка Dead Letter Queue для обработки ошибок
Средняя
от 1 рабочего дня до 3 рабочих дней
Часто задаваемые вопросы
Наши компетенции:
Этапы разработки
Последние работы
  • image_website-b2b-advance_0.png
    Разработка сайта компании B2B ADVANCE
    1214
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    852
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    823
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Разработка веб-сайта для компании ФИКСПЕР
    815

Настройка Dead Letter Queue для обработки ошибок

Когда консьюмер не может обработать сообщение — оно не должно просто исчезать. Dead Letter Queue (DLQ) — это очередь, куда автоматически попадают сообщения, которые не удалось доставить: rejected, expired по TTL или превысившие лимит доставки.

Без DLQ потерянные сообщения неотслеживаемы. С DLQ — есть возможность разобрать ошибку, исправить и вернуть сообщение в обработку.

Механизм DLQ в RabbitMQ

Сообщение перемещается в Dead Letter Exchange при трёх условиях:

  1. Консьюмер вызвал basic.nack или basic.reject с requeue=false
  2. Истёк TTL сообщения (x-message-ttl на очереди или expiration в свойствах)
  3. Очередь переполнена (x-max-length или x-max-length-bytes)
# 1. Создаём Dead Letter Exchange
rabbitmqadmin declare exchange \
    name=dlx \
    type=direct \
    durable=true

# 2. Создаём DLQ
rabbitmqadmin declare queue \
    name=order-processing-dlq \
    durable=true \
    arguments='{"x-queue-type":"quorum","x-message-ttl":2592000000}'
    # 30 дней retention для анализа

# 3. Привязываем DLQ к DLX
rabbitmqadmin declare binding \
    source=dlx \
    destination=order-processing-dlq \
    routing_key=order-processing.failed

# 4. Основная очередь с указанием DLX
rabbitmqadmin declare queue \
    name=order-processing \
    durable=true \
    arguments='{
        "x-queue-type": "quorum",
        "x-dead-letter-exchange": "dlx",
        "x-dead-letter-routing-key": "order-processing.failed",
        "x-delivery-limit": 3
    }'
    # x-delivery-limit: после 3 попыток — в DLQ (только для quorum queues)

Retry с exponential backoff

Простой nack сразу возвращает сообщение в очередь — воркер снова берёт его и снова падает. Правильный подход — задержанный retry через цепочку очередей.

# Очередь задержки 1 минута
rabbitmqadmin declare queue \
    name=order-processing-retry-1m \
    durable=true \
    arguments='{
        "x-message-ttl": 60000,
        "x-dead-letter-exchange": "",
        "x-dead-letter-routing-key": "order-processing",
        "x-queue-type": "classic"
    }'
    # Сообщение истекает через 1 минуту → автоматически идёт в основную очередь

# Очередь задержки 10 минут
rabbitmqadmin declare queue \
    name=order-processing-retry-10m \
    durable=true \
    arguments='{
        "x-message-ttl": 600000,
        "x-dead-letter-exchange": "",
        "x-dead-letter-routing-key": "order-processing",
        "x-queue-type": "classic"
    }'

# Очередь задержки 1 час
rabbitmqadmin declare queue \
    name=order-processing-retry-1h \
    durable=true \
    arguments='{
        "x-message-ttl": 3600000,
        "x-dead-letter-exchange": "",
        "x-dead-letter-routing-key": "order-processing",
        "x-queue-type": "classic"
    }'

Логика в консьюмере:

function handleMessage(AMQPMessage $message): void
{
    $headers = $message->get('application_headers');
    $retryCount = $headers ? (int)($headers->getNativeData()['x-retry-count'] ?? 0) : 0;

    try {
        processOrder(json_decode($message->body, true));
        $message->ack();
    } catch (TemporaryException $e) {
        // Временная ошибка — retryable
        $retryCount++;

        if ($retryCount >= 3) {
            // Исчерпали попытки — в DLQ
            $message->nack(false);
            return;
        }

        // Отправляем в retry-очередь с задержкой
        $retryQueue = match($retryCount) {
            1 => 'order-processing-retry-1m',
            2 => 'order-processing-retry-10m',
            default => 'order-processing-retry-1h',
        };

        $retryMessage = new AMQPMessage(
            $message->body,
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'headers' => new AMQPTable(array_merge(
                    $headers ? $headers->getNativeData() : [],
                    [
                        'x-retry-count'  => $retryCount,
                        'x-original-queue' => 'order-processing',
                        'x-last-error'   => $e->getMessage(),
                        'x-retry-at'     => date('Y-m-d H:i:s'),
                    ]
                )),
            ]
        );

        $channel->basic_publish($retryMessage, '', $retryQueue);
        $message->ack(); // ack оригинал, чтобы не было дублей
    } catch (PermanentException $e) {
        // Постоянная ошибка — сразу в DLQ
        $message->nack(false);
        Log::error('Permanent failure, message sent to DLQ', [
            'order_id' => $payload['order_id'],
            'error' => $e->getMessage(),
        ]);
    }
}

Kafka DLQ

В Kafka нет встроенного механизма DLQ — он реализуется в коде консьюмера:

@Component
public class OrderEventConsumer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final String DLQ_TOPIC = "order-events-dlq";

    @KafkaListener(topics = "order-events", groupId = "order-processor")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            processOrder(record.value());
            ack.acknowledge();
        } catch (RetriableException e) {
            // Spring Kafka автоматически ретраит с backoff
            throw e; // не ack — SeekToCurrentErrorHandler возьмёт управление
        } catch (Exception e) {
            // Non-retriable — отправляем в DLQ
            sendToDlq(record, e);
            ack.acknowledge(); // ack оригинал чтобы не застрять
        }
    }

    private void sendToDlq(ConsumerRecord<String, String> original, Exception error) {
        Headers headers = new RecordHeaders(original.headers().toArray());
        headers.add("x-original-topic", original.topic().getBytes());
        headers.add("x-original-partition", String.valueOf(original.partition()).getBytes());
        headers.add("x-original-offset", String.valueOf(original.offset()).getBytes());
        headers.add("x-error-message", error.getMessage().getBytes());
        headers.add("x-failed-at", Instant.now().toString().getBytes());

        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            DLQ_TOPIC,
            null,
            original.key(),
            original.value(),
            headers
        );

        kafkaTemplate.send(dlqRecord);
        log.error("Sent to DLQ: topic={} partition={} offset={} error={}",
            original.topic(), original.partition(), original.offset(), error.getMessage());
    }
}

Конфигурация Spring Kafka с автоматическим retry:

@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<?, ?> template) {
    // Exponential backoff: 1s, 2s, 4s, 8s, 16s
    ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
    backOff.setInitialInterval(1000L);
    backOff.setMultiplier(2.0);
    backOff.setMaxInterval(16000L);

    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (record, ex) -> new TopicPartition(record.topic() + "-dlq", record.partition() % 3)
    );

    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
    handler.addNotRetryableExceptions(
        JsonProcessingException.class,
        IllegalArgumentException.class
    );
    return handler;
}

Мониторинг DLQ

Алерт на рост DLQ — первый признак системной проблемы:

# Prometheus alert rule
- alert: DLQGrowth
  expr: rabbitmq_queue_messages{queue=~".*dlq.*"} > 100
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "DLQ {{ $labels.queue }} has {{ $value }} messages"

- alert: DLQSpike
  expr: rate(rabbitmq_queue_messages_published_total{queue=~".*dlq.*"}[5m]) > 10
  for: 2m
  labels:
    severity: critical

Инструмент для reprocess сообщений из DLQ

#!/bin/bash
# reprocess-dlq.sh — перемещаем сообщения из DLQ обратно в основную очередь

DLQ="order-processing-dlq"
TARGET_QUEUE="order-processing"
BATCH=100

for i in $(seq 1 $BATCH); do
    MESSAGE=$(rabbitmqadmin get queue=$DLQ ackmode=ack_requeue_false count=1 2>/dev/null)
    if [ -z "$MESSAGE" ]; then
        echo "DLQ is empty"
        break
    fi

    # Публикуем обратно в основную очередь
    BODY=$(echo "$MESSAGE" | python3 -c "import sys,json; msgs=json.load(sys.stdin); print(msgs[0]['payload'] if msgs else '')" 2>/dev/null)
    rabbitmqadmin publish exchange='' routing_key="$TARGET_QUEUE" payload="$BODY"
done

Таймлайн

День 1 — проектирование схемы DLQ: для каждой рабочей очереди — DLX, DLQ, очереди retry с TTL. Создание через CLI или Management API.

День 2 — интеграция retry-логики в консьюмеров, сохранение метаданных ошибки в заголовках (original topic, timestamp, error message).

День 3 — алерты на рост DLQ, инструмент для reprocess, документирование процедуры обработки DLQ-сообщений для команды поддержки.