前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ延迟队列

RabbitMQ延迟队列

原创
作者头像
会洗碗的CV工程师
发布2024-05-04 11:10:13
1820
发布2024-05-04 11:10:13
举报
文章被收录于专栏:消息中间件

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

RabbitMQ本身并不直接支持延迟队列(Delayed Message Queue)的功能,但你可以通过结合RabbitMQ的TTL(Time-To-Live,消息过期时间)和死信队列(Dead Letter Queue, DLQ)的特性来模拟实现延迟队列的效果。

以下是一个基于RabbitMQ TTL和DLQ实现延迟队列的步骤:

1. 配置RabbitMQ

1.1 创建普通队列

这个队列将用于接收并尝试消费消息。如果消息在一定时间内没有被消费或者消费失败,它们将被发送到死信队列。

1.2 创建死信队列(DLQ)

这个队列将用于接收来自普通队列的死信消息。可以在这里设置消费者来处理延迟的消息。

1.3 创建交换机

需要一个交换机来将消息从生产者路由到普通队列,并且还需要一个交换机(可以是同一个交换机)来将死信从普通队列路由到死信队列。

1.4 绑定队列和交换机

将普通队列和死信队列分别绑定到相应的交换机上,并设置正确的路由键。具体代码如下:

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {

    private final String ORDER_EXCHANGE = "order_exchange";
    private final String ORDER_QUEUE = "order_queue";
    private final String EXPIRE_EXCHANGE = "expire_exchange";
    private final String EXPIRE_QUEUE = "expire_queue";

    // 创建过期交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange getExchange(){
        return ExchangeBuilder
                // 交换机类型
                .topicExchange(EXPIRE_EXCHANGE)
                // 是否持久化
                .durable(true)
                .build();

    }

    // 创建过期队列
    @Bean(EXPIRE_QUEUE)
    public Queue getMessageQueue(){
        return QueueBuilder
                .durable(EXPIRE_QUEUE)
                .build();
    }

    // 创建订单交换机
    @Bean(ORDER_EXCHANGE)
    public Exchange getExchange1(){
        return ExchangeBuilder
                // 交换机类型
                .topicExchange(ORDER_EXCHANGE)
                // 是否持久化
                .durable(true)
                .build();

    }

    // 创建订单队列
    @Bean(ORDER_QUEUE)
    public Queue getMessageQueue2(){
        return QueueBuilder
                .durable(ORDER_QUEUE)
                // 绑定过期交换机
                .deadLetterExchange(EXPIRE_EXCHANGE)
                // 过期队列路由关键字
                .deadLetterRoutingKey("expire_routing")
                // 队列每条消息只能存活15s
                .ttl(15000)
                // 队列最大长度10
                .maxLength(10)
                .build();
    }

    // 过期交换机绑定过期队列
    @Bean
    public Binding bindingMessageQueue5(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,
                                       @Qualifier(EXPIRE_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("expire_routing")
                .noargs();

    }

    // 订单交换机绑定订单队列
    @Bean
    public Binding bindingMessageQueue4(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
                                        @Qualifier(ORDER_QUEUE) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("order_routing")
                .noargs();

    }
}

2. 设置消息TTL

在发送消息到普通队列时,为消息设置一个TTL(Time-To-Live)。当消息在队列中等待的时间超过TTL时,它将被视为死信并被发送到死信队列。

这里在上诉代码中已经设置了15s的存活时间。

3. 发送消息

使用RabbitMQ的客户端库(如Spring AMQP的RabbitTemplate)发送消息到普通队列,并设置消息的TTL。

代码语言:javascript
复制
// 下单
@GetMapping("/place/{orderId}")
public String placeOrder(@PathVariable String orderId){
    System.out.println(LocalDateTime.now()+"处理订单数据.....");
    // 向rabbitmq发送订单id
    rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);
    return LocalDateTime.now()+"下单成功,修改库存";
}

这里发送了下单请求

4. 监听生产者

在死信队列的消费者中处理延迟的消息。

代码语言:javascript
复制
@Component
public class ExpireOrderConsumer {
    // 消费者2
    @RabbitListener(queues = "expire_queue")
    public void listenMessage2(String orderId){
        System.out.println(LocalDateTime.now()+"查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
    }
}

5. 测试

访问接口。

这里可以看得到,15s之后监听订单的消息,可以知道订单是否处理。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 配置RabbitMQ
    • 1.1 创建普通队列
      • 1.2 创建死信队列(DLQ)
        • 1.3 创建交换机
          • 1.4 绑定队列和交换机
          • 2. 设置消息TTL
          • 3. 发送消息
          • 4. 监听生产者
          • 5. 测试
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档