线上看到死信队列突然涨,别急着骂消费者。
我一般先看两样东西:一个是x-death头,另一个是消费端有没有把requeue写成false。RabbitMQ 里的消息不是“消费失败就进死信”,这句话太粗,真按这个理解,排障时很容易绕半天。
RabbitMQ 官方文档里把消息进入死信交换机的触发点归成几类:消费端拒绝且不重新入队、消息 TTL 过期、队列长度超限、Quorum Queue 超过投递次数限制;另外,整个队列过期时,队列里的消息不会被死信转发。这个细节挺容易漏。
先看一个比较常见的配置。
@Configuration
publicclassPayMqConfig{
publicstaticfinal String PAY_EXCHANGE = "pay.biz.ex";
publicstaticfinal String PAY_QUEUE = "pay.notify.q";
publicstaticfinal String DLX_EXCHANGE = "pay.dlx.ex";
publicstaticfinal String DLX_QUEUE = "pay.dlx.q";
@Bean
DirectExchange payExchange(){
returnnew DirectExchange(PAY_EXCHANGE, true, false);
}
@Bean
DirectExchange payDlxExchange(){
returnnew DirectExchange(DLX_EXCHANGE, true, false);
}
@Bean
Queue payNotifyQueue(){
return QueueBuilder.durable(PAY_QUEUE)
.deadLetterExchange(DLX_EXCHANGE)
.deadLetterRoutingKey("pay.dead")
.ttl(30_000)
.maxLength(50_000)
.build();
}
@Bean
Queue payDlxQueue(){
return QueueBuilder.durable(DLX_QUEUE).build();
}
@Bean
Binding payBinding(){
return BindingBuilder.bind(payNotifyQueue())
.to(payExchange())
.with("pay.notify");
}
@Bean
Binding payDlxBinding(){
return BindingBuilder.bind(payDlxQueue())
.to(payDlxExchange())
.with("pay.dead");
}
}
这段配置里,pay.notify.q才是真正的业务队列。它身上挂了两个东西:
x-dead-letter-exchange = pay.dlx.ex
x-dead-letter-routing-key = pay.dead
消息一旦被 RabbitMQ 判定为死信,就会从原队列重新投递到pay.dlx.ex,再根据pay.dead路由到pay.dlx.q。
这里要注意,死信交换机不是特殊交换机。它就是普通 exchange,只是被某个队列当成了“兜底出口”。
第一种情况,消费者明确拒绝,并且不让它回原队列。
这个是线上最常见的。比如支付回调消息来了,订单状态查不到,或者幂等校验发现数据对不上。这个时候我不太建议无脑重试,尤其是参数本身就是脏的,重试十次也没用。
@Component
publicclassPayNotifyConsumer{
@RabbitListener(queues = PayMqConfig.PAY_QUEUE, ackMode = "MANUAL")
publicvoidonMessage(Message message, Channel channel)throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
String body = new String(message.getBody(), StandardCharsets.UTF_8);
try {
PayNotifyEvent event = JsonUtil.read(body, PayNotifyEvent.class);
if (event.getOrderNo() == null || event.getPayNo() == null) {
log.warn("pay notify invalid body={}", body);
channel.basicNack(tag, false, false);
return;
}
payNotifyService.confirm(event);
channel.basicAck(tag, false);
} catch (TemporaryPayException e) {
log.warn("pay notify temporary error, requeue body={}", body, e);
channel.basicNack(tag, false, true);
} catch (Exception e) {
log.error("pay notify consume failed, send to dlx body={}", body, e);
channel.basicNack(tag, false, false);
}
}
}
关键就在最后一个参数。
basicNack(tag, false, true) -> 回原队列
basicNack(tag, false, false) -> 进死信交换机
basicReject也是一样,只要requeue=false,并且队列配置了 DLX,这条消息就会被转走。
我见过有人把所有异常都写成requeue=true,结果一条坏消息在队列里反复投递,消费者日志刷得飞快,业务一条没动。这个不叫可靠,这叫自转。
第二种情况,消息过期。
TTL 有两种写法,一种是队列级别,一种是消息级别。
队列级别就是刚才配置里的:
.ttl(30_000)
意思是消息在这个队列里待超过 30 秒,还没被消费,就会过期。队列上配置了 DLX,它就会进死信交换机。
消息级别一般在发送时设置:
publicvoidsendPayTimeoutCheck(String orderNo){
MessageProperties props = new MessageProperties();
props.setContentType(MessageProperties.CONTENT_TYPE_JSON);
props.setExpiration("60000");
props.setMessageId("pay-check-" + orderNo);
byte[] body = JsonUtil.writeBytes(Map.of("orderNo", orderNo));
rabbitTemplate.send(
PayMqConfig.PAY_EXCHANGE,
"pay.notify",
new Message(body, props)
);
}
这里的expiration("60000")是字符串,单位毫秒。这个地方我以前也嫌弃过,明明是数字,偏要传字符串,写错了还不一定第一眼看出来。
TTL 有个坑:不是你设置了 60 秒,消息就一定在第 60 秒准时进死信队列。它和队列类型、消息位置都有关系。排延迟问题时,别拿秒表卡 RabbitMQ。
第三种情况,队列满了。
比如配置了:
.maxLength(50_000)
当队列里消息数量超过限制,RabbitMQ 会把超出的消息处理掉。如果这个队列配置了死信交换机,被挤出去的消息就会进入 DLX。
这个场景我一般会先看监控。
queue=pay.notify.q
messages_ready=50000
messages_unacked=12
publish_rate=1800/s
deliver_rate=200/s
这种不是死信队列的问题,是消费者扛不住了。你盯着死信队列消费脚本改半天没用,前面业务队列已经堵死了。
第四种情况,Quorum Queue 的投递次数超过限制。
普通经典队列里,RabbitMQ 不会天然帮你数“这条消息失败了几次就扔死信”。很多项目是自己在 header 里塞 retry 次数,或者靠 TTL + DLX 做延迟重试。
但 Quorum Queue 支持 delivery-limit。超过限制后,消息会被死信处理。
类似这样:
@Bean
Queue payQuorumQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
args.put("x-delivery-limit", 5);
args.put("x-dead-letter-exchange", PayMqConfig.DLX_EXCHANGE);
args.put("x-dead-letter-routing-key", "pay.dead");
returnnew Queue("pay.quorum.q", true, false, false, args);
}
这种更适合那些“最多试几次,不行就人工处理”的消息。别让一条毒消息一直卡在消费链路里。
死信到了队列里,我一般不会马上补偿,先把头信息打出来。
@RabbitListener(queues = PayMqConfig.DLX_QUEUE)
publicvoidonDeadMessage(Message message){
MessageProperties props = message.getMessageProperties();
log.warn("dead message received, msgId={}, routingKey={}, xDeath={}, body={}",
props.getMessageId(),
props.getReceivedRoutingKey(),
props.getHeaders().get("x-death"),
new String(message.getBody(), StandardCharsets.UTF_8));
}
x-death里通常能看到原因,比如:
reason=rejected
reason=expired
reason=maxlen
这个比看代码猜强多了。
最后再补一句,下面这些情况不会自动进死信:
消费者 ack 成功了,不会进死信
basicNack requeue=true,不会进死信
消息发到 exchange 但没有路由到队列,不等于队列死信
整个队列过期删除,里面的消息不会逐条进死信
RabbitMQ 的死信机制不是垃圾桶,更像事故现场的暂存区。
配置 DLX 只是第一步,真正要看的还是这条消息为什么死:是业务参数脏了,是消费者异常了,是 TTL 到了,还是队列被打满了。
别上来就写个定时任务把死信队列重新塞回业务队列。那种操作,看着像补偿,实际上很可能只是把事故重新播放一遍。