前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ的死信队列

RabbitMQ的死信队列

原创
作者头像
会洗碗的CV工程师
发布2024-05-03 09:40:40
5740
发布2024-05-03 09:40:40
举报
文章被收录于专栏:消息中间件

一、概念

RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制。在RabbitMQ中,当消息出现以下情况时,它可能会被标记为死信:

  1. 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息可以被标记为死信。
  2. 消息被拒绝:当消费者调用RabbitMQ的basic.reject或basic.nack方法拒绝消息,并且requeue标志被设置为false时,消息也会被标记为死信。
  3. 消息过期:在RabbitMQ中,消息可以设置过期时间。如果消息在规定的时间内没有被消费,它会被认为是死信并被发送到死信队列。

为了处理这些死信,RabbitMQ引入了死信队列的概念。当消息被标记为死信后,如果配置了死信队列,RabbitMQ会将该消息发送到死信交换机(Dead Letter Exchange)。死信交换机再根据配置的路由键(Routing Key)将消息投递到指定的死信队列中。

在死信队列中,可以对消息进行重新处理、记录或丢弃等操作。例如,可以将死信消息重新发送到另一个队列以供其他消费者再次尝试处理,或者将消息记录到日志中以供后续分析。

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  1. 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。
  2. 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。
  3. 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。
  4. 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。
  5. 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

总的来说,死信队列是RabbitMQ中一个非常有用的特性,它可以帮助我们更好地处理消息消费失败的情况,提高系统的稳定性和可靠性。

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

二、创建死信队列

首先创建一个死信交换机和死信队列;普通交换机和普通队列,然后在创建普通队列时让其绑定死信交换机,并且设置队列的存活时间为15s,以及最大长度为10。具体代码实现如下:

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig4 {

    private final String DEAD_EXCHANGE = "dead_exchange";
    private final String DEAD_QUEUE = "dead_queue";
    private final String NORMAL_EXCHANGE = "normal_exchange";
    private final String NORMAL_QUEUE = "normal_queue";

    // 创建死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange getExchange(){
        return ExchangeBuilder
                // 交换机类型
                .topicExchange(DEAD_EXCHANGE)
                // 是否持久化
                .durable(true)
                .build();

    }

    // 创建死信队列
    @Bean(DEAD_QUEUE)
    public Queue getMessageQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)
                // 队列每条消息只能存活10s
                .ttl(10000)
                .build();
    }

    // 创建普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange getExchange1(){
        return ExchangeBuilder
                // 交换机类型
                .topicExchange(NORMAL_EXCHANGE)
                // 是否持久化
                .durable(true)
                .build();

    }

    // 创建普通队列
    @Bean(NORMAL_QUEUE)
    public Queue getMessageQueue2(){
        return QueueBuilder
                .durable(NORMAL_QUEUE)
                // 绑定死信交换机
                .deadLetterExchange(DEAD_EXCHANGE)
                // 死信队列路由关键字
                .deadLetterRoutingKey("dead_routing")
                // 队列每条消息只能存活15s
                .ttl(15000)
                // 队列最大长度10
                .maxLength(10)
                .build();
    }

    // 死信交换机绑定死信队列
    @Bean
    public Binding bindingMessageQueue3(@Qualifier(DEAD_EXCHANGE) Exchange exchange,
                                       @Qualifier(DEAD_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("dead_routing")
                .noargs();

    }

    // 普通交换机绑定普通队列
    @Bean
    public Binding bindingMessageQueue4(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
                                        @Qualifier(NORMAL_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("my_routing")
                .noargs();

    }
}

三、测试死信队列

接下来在测试方法中向普通交换机发送信息:

代码语言:javascript
复制
// 测试死信队列
@Test
public void testDead(){
    // 存活时间过期后变成死信队列
    rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}

在12秒内成功截图,还可以看得到队列消息,但是过了15s之后就看不到消息了。

接下来测试超过队列长度变成私信队列,代码如下:

代码语言:javascript
复制
// 测试死信队列
@Test
public void testDead(){
    // 存活时间过期后变成死信队列
    // rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    // 测试超过队列长度变成死信队列
    for (int i=0;i<20;i++){
        rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    }
}

执行代码,观察管控台

这里只保存9条消息,剩下的消息都变成死信队列。

接下来测试消息拒签后不返回队列变成死信。

代码语言:javascript
复制
    // 测试死信队列
    @Test
    public void testDead(){
        // 存活时间过期后变成死信队列
        // rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
        // 测试超过队列长度变成死信队列
//        for (int i=0;i<8;i++){
//            rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
//        }
        // 消费者拒签后不反悔变成死信
        rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    }
代码语言:javascript
复制
@Component
public class DlxConsumer {
    @RabbitListener(queues = "normal_queue")
    public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
        // 拒签消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
    }
}

执行结果,可以观察以下截图:

可以观察得到,执行测试代码两秒后消费者拒签消息,三秒后查看队列,也查看不了,总共没有超过队列的存活时间,而且也没有超过队列长度,但依旧看不到,证明消费者拒签后也会变成死信队列。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概念
  • 二、创建死信队列
  • 三、测试死信队列
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档