Настройка Dead Letter Queue для обработки ошибок
Когда консьюмер не может обработать сообщение — оно не должно просто исчезать. Dead Letter Queue (DLQ) — это очередь, куда автоматически попадают сообщения, которые не удалось доставить: rejected, expired по TTL или превысившие лимит доставки.
Без DLQ потерянные сообщения неотслеживаемы. С DLQ — есть возможность разобрать ошибку, исправить и вернуть сообщение в обработку.
Механизм DLQ в RabbitMQ
Сообщение перемещается в Dead Letter Exchange при трёх условиях:
- Консьюмер вызвал
basic.nackилиbasic.rejectсrequeue=false - Истёк TTL сообщения (
x-message-ttlна очереди илиexpirationв свойствах) - Очередь переполнена (
x-max-lengthилиx-max-length-bytes)
# 1. Создаём Dead Letter Exchange
rabbitmqadmin declare exchange \
name=dlx \
type=direct \
durable=true
# 2. Создаём DLQ
rabbitmqadmin declare queue \
name=order-processing-dlq \
durable=true \
arguments='{"x-queue-type":"quorum","x-message-ttl":2592000000}'
# 30 дней retention для анализа
# 3. Привязываем DLQ к DLX
rabbitmqadmin declare binding \
source=dlx \
destination=order-processing-dlq \
routing_key=order-processing.failed
# 4. Основная очередь с указанием DLX
rabbitmqadmin declare queue \
name=order-processing \
durable=true \
arguments='{
"x-queue-type": "quorum",
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "order-processing.failed",
"x-delivery-limit": 3
}'
# x-delivery-limit: после 3 попыток — в DLQ (только для quorum queues)
Retry с exponential backoff
Простой nack сразу возвращает сообщение в очередь — воркер снова берёт его и снова падает. Правильный подход — задержанный retry через цепочку очередей.
# Очередь задержки 1 минута
rabbitmqadmin declare queue \
name=order-processing-retry-1m \
durable=true \
arguments='{
"x-message-ttl": 60000,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "order-processing",
"x-queue-type": "classic"
}'
# Сообщение истекает через 1 минуту → автоматически идёт в основную очередь
# Очередь задержки 10 минут
rabbitmqadmin declare queue \
name=order-processing-retry-10m \
durable=true \
arguments='{
"x-message-ttl": 600000,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "order-processing",
"x-queue-type": "classic"
}'
# Очередь задержки 1 час
rabbitmqadmin declare queue \
name=order-processing-retry-1h \
durable=true \
arguments='{
"x-message-ttl": 3600000,
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "order-processing",
"x-queue-type": "classic"
}'
Логика в консьюмере:
function handleMessage(AMQPMessage $message): void
{
$headers = $message->get('application_headers');
$retryCount = $headers ? (int)($headers->getNativeData()['x-retry-count'] ?? 0) : 0;
try {
processOrder(json_decode($message->body, true));
$message->ack();
} catch (TemporaryException $e) {
// Временная ошибка — retryable
$retryCount++;
if ($retryCount >= 3) {
// Исчерпали попытки — в DLQ
$message->nack(false);
return;
}
// Отправляем в retry-очередь с задержкой
$retryQueue = match($retryCount) {
1 => 'order-processing-retry-1m',
2 => 'order-processing-retry-10m',
default => 'order-processing-retry-1h',
};
$retryMessage = new AMQPMessage(
$message->body,
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => new AMQPTable(array_merge(
$headers ? $headers->getNativeData() : [],
[
'x-retry-count' => $retryCount,
'x-original-queue' => 'order-processing',
'x-last-error' => $e->getMessage(),
'x-retry-at' => date('Y-m-d H:i:s'),
]
)),
]
);
$channel->basic_publish($retryMessage, '', $retryQueue);
$message->ack(); // ack оригинал, чтобы не было дублей
} catch (PermanentException $e) {
// Постоянная ошибка — сразу в DLQ
$message->nack(false);
Log::error('Permanent failure, message sent to DLQ', [
'order_id' => $payload['order_id'],
'error' => $e->getMessage(),
]);
}
}
Kafka DLQ
В Kafka нет встроенного механизма DLQ — он реализуется в коде консьюмера:
@Component
public class OrderEventConsumer {
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String DLQ_TOPIC = "order-events-dlq";
@KafkaListener(topics = "order-events", groupId = "order-processor")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processOrder(record.value());
ack.acknowledge();
} catch (RetriableException e) {
// Spring Kafka автоматически ретраит с backoff
throw e; // не ack — SeekToCurrentErrorHandler возьмёт управление
} catch (Exception e) {
// Non-retriable — отправляем в DLQ
sendToDlq(record, e);
ack.acknowledge(); // ack оригинал чтобы не застрять
}
}
private void sendToDlq(ConsumerRecord<String, String> original, Exception error) {
Headers headers = new RecordHeaders(original.headers().toArray());
headers.add("x-original-topic", original.topic().getBytes());
headers.add("x-original-partition", String.valueOf(original.partition()).getBytes());
headers.add("x-original-offset", String.valueOf(original.offset()).getBytes());
headers.add("x-error-message", error.getMessage().getBytes());
headers.add("x-failed-at", Instant.now().toString().getBytes());
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
null,
original.key(),
original.value(),
headers
);
kafkaTemplate.send(dlqRecord);
log.error("Sent to DLQ: topic={} partition={} offset={} error={}",
original.topic(), original.partition(), original.offset(), error.getMessage());
}
}
Конфигурация Spring Kafka с автоматическим retry:
@Bean
public DefaultErrorHandler errorHandler(KafkaOperations<?, ?> template) {
// Exponential backoff: 1s, 2s, 4s, 8s, 16s
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
backOff.setInitialInterval(1000L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(16000L);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition(record.topic() + "-dlq", record.partition() % 3)
);
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
handler.addNotRetryableExceptions(
JsonProcessingException.class,
IllegalArgumentException.class
);
return handler;
}
Мониторинг DLQ
Алерт на рост DLQ — первый признак системной проблемы:
# Prometheus alert rule
- alert: DLQGrowth
expr: rabbitmq_queue_messages{queue=~".*dlq.*"} > 100
for: 5m
labels:
severity: warning
annotations:
summary: "DLQ {{ $labels.queue }} has {{ $value }} messages"
- alert: DLQSpike
expr: rate(rabbitmq_queue_messages_published_total{queue=~".*dlq.*"}[5m]) > 10
for: 2m
labels:
severity: critical
Инструмент для reprocess сообщений из DLQ
#!/bin/bash
# reprocess-dlq.sh — перемещаем сообщения из DLQ обратно в основную очередь
DLQ="order-processing-dlq"
TARGET_QUEUE="order-processing"
BATCH=100
for i in $(seq 1 $BATCH); do
MESSAGE=$(rabbitmqadmin get queue=$DLQ ackmode=ack_requeue_false count=1 2>/dev/null)
if [ -z "$MESSAGE" ]; then
echo "DLQ is empty"
break
fi
# Публикуем обратно в основную очередь
BODY=$(echo "$MESSAGE" | python3 -c "import sys,json; msgs=json.load(sys.stdin); print(msgs[0]['payload'] if msgs else '')" 2>/dev/null)
rabbitmqadmin publish exchange='' routing_key="$TARGET_QUEUE" payload="$BODY"
done
Таймлайн
День 1 — проектирование схемы DLQ: для каждой рабочей очереди — DLX, DLQ, очереди retry с TTL. Создание через CLI или Management API.
День 2 — интеграция retry-логики в консьюмеров, сохранение метаданных ошибки в заголовках (original topic, timestamp, error message).
День 3 — алерты на рост DLQ, инструмент для reprocess, документирование процедуры обработки DLQ-сообщений для команды поддержки.







