前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【消息队列】基于RabbitMQ实现延迟队列

【消息队列】基于RabbitMQ实现延迟队列

作者头像
千羽
发布2024-06-18 11:05:10
730
发布2024-06-18 11:05:10
举报
文章被收录于专栏:程序员千羽程序员千羽

最近我在京东看上了一款《Apple/苹果2024款MacBookAir 15英寸 M3(8+10核)16G1T深空灰轻薄笔记本电脑Z1GD0001N【定制】

然后下单还没付款,然后出现了付款倒计时的显示,其原理就用到了队列的延迟队列。

那么,RabbitMQ延迟队列是什么?

RabbitMQ延迟队列允许生产者发送消息时指定一个延迟时间,消费者不会立即收到消息,而是在指定的延迟时间之后才收到消息。

基于死信的延迟队列.drawio

RabbitMQ延迟队列的应用场景有以下几个方面:

  1. 订单超时处理:在电商平台等场景中,订单支付后需要在一定时间内完成配送。如果超过指定时间未完成,则需要进行相应的处理,如取消订单或重新安排配送。延迟队列可以用来监控订单的支付时间,并在超时后触发相应的处理逻辑。
  2. 消息通知:例如,在用户注册后发送欢迎邮件或短信的场景中,可以使用延迟队列来实现延时发送的效果。将发送消息放入延迟队列中,并设置一定的延迟时间后再执行发送操作。
  3. 延迟任务调度:例如,定期发送邮件或生成报表。通过将任务放入延迟队列并设置相应的延迟时间,可以在预定时间后自动执行任务。
  4. 秒杀活动的处理:在秒杀活动中,为了避免大量用户同时抢购导致的系统瘫痪,可以使用延迟队列来逐步处理订单信息。设置一定的延迟时间,将订单信息逐步发送到RabbitMQ中,以平滑处理流量高峰。
  5. 消息重试:当消息消费失败时,为了避免立即重试可能导致的重复消费和系统负载增加,可以将失败的消息放入延迟队列中。设置一定的延迟时间后再进行重试,这样可以给消费端一定的时间来处理其他任务,降低系统负载。

1. 如何实现RabbitMQ延迟队列?

这里有两个思路:

  • 方案1:借助消息超时时间+死信队列
  • 方案2:给RabbitMQ安装插件

这里我们采取方案2:给RabbitMQ安装插件

1.1 延迟插件简介

  • 官网地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  • 延迟极限:最多两天

2. rabbitmq-delayed插件安装

2.1 确定卷映射目录

代码语言:javascript
复制
docker inspect rabbitmq

运行结果:

代码语言:javascript
复制
"Mounts": [
            {
                "Type": "volume",
                "Name": "rabbitmq-plugin",
                "Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data",
                "Destination": "/plugins",
                "Driver": "local",
                "Mode": "z",
                "RW": true,
                "Propagation": ""
            },
            {
                "Type": "volume",
                "Name": "69a1071faed2b8b0e654a28bcf4fbb2cb9785d0ed2dc9a2d6ffd1d7c6a7835b4",
                "Source": "/var/lib/docker/volumes/69a1071faed2b8b0e654a28bcf4fbb2cb9785d0ed2dc9a2d6ffd1d7c6a7835b4/_data",
                "Destination": "/var/lib/rabbitmq",
                "Driver": "local",
                "Mode": "",
                "RW": true,
                "Propagation": ""
            }

和容器内/plugins目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data

2.2 下载延迟插件

官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html

下载插件安装文件:

代码语言:javascript
复制
# 下载
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 移动目录
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data

2.3 启用插件

代码语言:javascript
复制
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash

# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出Docker容器
exit

# 重启Docker容器
docker restart rabbitmq

2.4 确认

确认点1:查看当前节点已启用插件的列表:

确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了

3. 创建交换机

rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以,创建方式如下:

关于x-delayed-type参数的理解:

原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢? 这里就额外使用x-delayed-type来指定交换机本身的类型

4. 代码测试

4.1 生产者端代码

代码语言:javascript
复制
/**
 * 测试延迟队列
 */
@Test
public void test05SendMessageDelay() {
    // 创建消息后置处理器对象
    MessagePostProcessor postProcessor = message -> {
        // 设置消息过期时间(以毫秒为单位)
        // x-delay 参数必须基于 x-delayed-message-exchange 插件才能生效
        message.getMessageProperties().setHeader("x-delay", "10000"); // 10秒
        return message;
    };
    // 发送消息
    rabbitTemplate.convertAndSend(
            EXCHANGE_DELAY,
            ROUTING_KEY_DELAY,
            "Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date()),
            postProcessor);
}

4.2 消费者端代码

①情况A:资源已创建

代码语言:javascript
复制
package com.nateshao.consumer.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;  
  
@Component  
@Slf4j
public class MyDelayMessageListener {
    public static final String QUEUE_DELAY = "queue.delay.video";
    @RabbitListener(queues = {QUEUE_DELAY})
    public void process(String dataString, Message message, Channel channel) throws IOException {  
        log.info("[生产者]" + dataString);
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

②情况B:资源未创建

代码语言:javascript
复制
package com.nateshao.consumer.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.*;  
import org.springframework.stereotype.Component;  

import java.io.IOException;  
import java.text.SimpleDateFormat;  
import java.util.Date;  
  
@Component  
@Slf4j
/**
 * @Author 千羽
 * @公众号 程序员千羽
 * @Date 2024/5/29 16:00
 * @Version 1.0
 */
public class MyDelayMessageListener {  
  
    public static final String EXCHANGE_DELAY = "exchange.delay.video";
    public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
    public static final String QUEUE_DELAY = "queue.delay.video";
  
    @RabbitListener(bindings = @QueueBinding(  
        value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),  
        exchange = @Exchange(  
                value = EXCHANGE_DELAY,   
                durable = "true",   
                autoDelete = "false",   
                type = "x-delayed-message",   
                arguments = @Argument(name = "x-delayed-type", value = "direct")),  
        key = {ROUTING_KEY_DELAY}  
    ))  
    public void process(String dataString, Message message, Channel channel) throws IOException {  
        log.info("[生产者]" + dataString);  
        log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));  
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  
    }  
}

4.3 执行效果

①交换机类型

②生产者端效果

注意:使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行

③消费者端效果

5. 总结

基于RabbitMQ实现延迟队列主要用于处理需要延迟处理的消息,如订单超时、消息通知、任务调度等场景。

RabbitMQ提供了两种主要方式来实现延迟队列:

  1. 一是通过消息超时时间和死信队列的结合,
  2. 二是安装专门的延迟消息插件。

这里选择了第二种方案,即安装rabbitmq-delayed-message-exchange插件,该插件允许生产者发送消息时指定延迟时间,消费者将在指定的延迟时间后收到消息。插件的极限延迟时间可达两天,适用于多种需要延迟处理的业务场景。

创作不易,感谢大家支持!

参考链接:https://www.bilibili.com/video/BV1sw4m1U7Qe/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 千羽的编程时光 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 如何实现RabbitMQ延迟队列?
    • 1.1 延迟插件简介
    • 2. rabbitmq-delayed插件安装
      • 2.1 确定卷映射目录
        • 2.2 下载延迟插件
          • 2.3 启用插件
            • 2.4 确认
            • 3. 创建交换机
            • 4. 代码测试
              • 4.1 生产者端代码
                • 4.2 消费者端代码
                  • 4.3 执行效果
                    • ①交换机类型
                      • ②生产者端效果
                        • ③消费者端效果
                        • 5. 总结
                        相关产品与服务
                        消息队列
                        腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档