RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制。在RabbitMQ中,当消息出现以下情况时,它可能会被标记为死信:
为了处理这些死信,RabbitMQ引入了死信队列的概念。当消息被标记为死信后,如果配置了死信队列,RabbitMQ会将该消息发送到死信交换机(Dead Letter Exchange)。死信交换机再根据配置的路由键(Routing Key)将消息投递到指定的死信队列中。
在死信队列中,可以对消息进行重新处理、记录或丢弃等操作。例如,可以将死信消息重新发送到另一个队列以供其他消费者再次尝试处理,或者将消息记录到日志中以供后续分析。
死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:
总的来说,死信队列是RabbitMQ中一个非常有用的特性,它可以帮助我们更好地处理消息消费失败的情况,提高系统的稳定性和可靠性。
在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
首先创建一个死信交换机和死信队列;普通交换机和普通队列,然后在创建普通队列时让其绑定死信交换机,并且设置队列的存活时间为15s,以及最大长度为10。具体代码实现如下:
@Configuration
public class RabbitMQConfig4 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
// 创建死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange getExchange(){
return ExchangeBuilder
// 交换机类型
.topicExchange(DEAD_EXCHANGE)
// 是否持久化
.durable(true)
.build();
}
// 创建死信队列
@Bean(DEAD_QUEUE)
public Queue getMessageQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
// 队列每条消息只能存活10s
.ttl(10000)
.build();
}
// 创建普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange getExchange1(){
return ExchangeBuilder
// 交换机类型
.topicExchange(NORMAL_EXCHANGE)
// 是否持久化
.durable(true)
.build();
}
// 创建普通队列
@Bean(NORMAL_QUEUE)
public Queue getMessageQueue2(){
return QueueBuilder
.durable(NORMAL_QUEUE)
// 绑定死信交换机
.deadLetterExchange(DEAD_EXCHANGE)
// 死信队列路由关键字
.deadLetterRoutingKey("dead_routing")
// 队列每条消息只能存活15s
.ttl(15000)
// 队列最大长度10
.maxLength(10)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindingMessageQueue3(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
@Qualifier(DEAD_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindingMessageQueue4(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
@Qualifier(NORMAL_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
接下来在测试方法中向普通交换机发送信息:
// 测试死信队列
@Test
public void testDead(){
// 存活时间过期后变成死信队列
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
在12秒内成功截图,还可以看得到队列消息,但是过了15s之后就看不到消息了。
接下来测试超过队列长度变成私信队列,代码如下:
// 测试死信队列
@Test
public void testDead(){
// 存活时间过期后变成死信队列
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 测试超过队列长度变成死信队列
for (int i=0;i<20;i++){
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
}
执行代码,观察管控台
这里只保存9条消息,剩下的消息都变成死信队列。
接下来测试消息拒签后不返回队列变成死信。
// 测试死信队列
@Test
public void testDead(){
// 存活时间过期后变成死信队列
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 测试超过队列长度变成死信队列
// for (int i=0;i<8;i++){
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// }
// 消费者拒签后不反悔变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
@Component
public class DlxConsumer {
@RabbitListener(queues = "normal_queue")
public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
// 拒签消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
执行结果,可以观察以下截图:
可以观察得到,执行测试代码两秒后消费者拒签消息,三秒后查看队列,也查看不了,总共没有超过队列的存活时间,而且也没有超过队列长度,但依旧看不到,证明消费者拒签后也会变成死信队列。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。