前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 延迟队列

RabbitMQ 延迟队列

作者头像
pollyduan
发布2019-11-04 15:03:36
7070
发布2019-11-04 15:03:36
举报
文章被收录于专栏:云架构修炼手册

rabbitmq 本身不支持延迟队列,但提供了实现延迟队列的必备条件。

原理

  1. queue可以通过 x-message-ttl 参数设置过期时间,到了过期时间的消息就会被标记为 dead letter 状态。
  2. 过期的消息可以通过 x-dead-letter-exchangex-dead-letter-routing-key 参数转发到另一个 exchange 中去。

在management 中测试延迟队列

docker 部署测试rabbitmq

  • 如果已经有现成的rabbitmq,本小节跳过。

docker-compose.yml

代码语言:javascript
复制
version: "3"
services:
  rabbitmq:
    image: rabbitmq:3.7.12-management-alpine
    container_name: rabbitmq
    ports:
    - 15672:15672
    - 5671-5672:5671-5672

启动:

代码语言:javascript
复制
docker-compose up -d

创建出队队列

  1. 选择 Queues 面板,打开 Add a new queue 栏目。
  2. 输入如下队列数据:
代码语言:javascript
复制
Name: delay_queue_deque

创建exchange

  1. 选择 Exchanges 面板,打开 Add a new exchange 栏目。
  2. 输入如下队列数据:
代码语言:javascript
复制
Name: delay_exchange
  1. 添加 Binding:
代码语言:javascript
复制
点击新创建的 delay_exchange;
打开 Bindings 栏目;
选择 "To queue" ,并输入值 "delay_queue_deque";
"Routing key:" 中输入 "delay_routing_key"
  • To queue 要和前面的出队队列配置的 Name 的值相同。

创建入队队列

  1. 选择 Queues 面板,打开 Add a new queue 栏目。
  2. 输入如下队列数据:
代码语言:javascript
复制
Name: delay_queue_enque
Arguments:
  x-message-ttl 30000
  x-dead-letter-exchange delay_exchange
  x-dead-letter-routing-key delay_routing_key
  • 添加参数时,可点击下面add 工具栏里的链接,会自动填充前面的参数名。
  • x-dead-letter-exchange 要和前面的 exchange 配置的 Name 的值相同。
  • 参数 x-dead-letter-routing-key 要和 exchange 添加的Binding 的 Routing key 的值相同。

测试

在 Queues 面板中,点击 delay_queue_enque 队列。

找到 Publish message,在 payload 中输入测试内容:"hello-001",点击 Publish message 按钮。

点击 "Queues" 面板按钮,就会看到delay_queue_enque 队列中有了一条数据,Ready:1。

等待30秒,消息就会转到 delay_queue_deque 中去了。

点击 delay_queue_deque ,找到 Get messages 栏目,点击 Get messages 按钮,检查取出的数据,为publish的message,测试无误。

  • 测试时会发现,get messages之后,消息还在队列里。这是因为默认的应答策略是:Nack message requeue true,改为 Ack message requeue false并再次获取消息 ,消息就会真的被消费掉了。

java 测试

在management 中的测试理解了,在java中的代码就容易理解了。

创建springboot项目

引入 RabbitMQ组件,或者手动加入:

代码语言:javascript
复制
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置队列

代码语言:javascript
复制
private static final String DEQUE_QUEUE_NAME = "dequeQueue";
private static final String ENQUE_QUEUE_NAME = "enqueQueue";
private static final String DEQUE_QUEUE_NAME_KEY = "dequeQueueKey";
private static final String DELAY_EXCHANGE="exchange_delay";
//死信路由
  @Bean
  DirectExchange exchange() {
      return new DirectExchange(DELAY_EXCHANGE);
  }

  //用于延时消费的队列
  @Bean
  public Queue dequeQueue() {
      Queue queue = new Queue(DEQUE_QUEUE_NAME,true,false,false);
      return queue;
  }

  //绑定exchange 到出队队列
  @Bean
  public Binding  deadLetterBinding() {
      return BindingBuilder.bind(dequeQueue()).to(exchange()).with(DEQUE_QUEUE_NAME_KEY);
  }

  //配置死信队列,即入队队列
  @Bean
  public Queue deadLetterQueue() {
      Map<String,Object> args = new HashMap<>();
      args.put("x-message-ttl", 20000);
      args.put("x-dead-letter-exchange", DELAY_EXCHANGE);
      args.put("x-dead-letter-routing-key", DEQUE_QUEUE_NAME_KEY);
      return new Queue(ENQUE_QUEUE_NAME, true, false, false, args);
  }

发送消息

代码语言:javascript
复制
package com.pollyduan.rabbitmq;

import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
	@Autowired
    private AmqpTemplate rabbitTemplate;

	@GetMapping("send")
	public String send() {
		String msg="hello-001";
		System.out.println("发送消息:"+LocalDateTime.now().toString()+" 内容:"+msg);
        rabbitTemplate.convertAndSend("enqueQueue", msg);
        return "ok";
	}
}

接收消息

代码语言:javascript
复制
package com.pollyduan.rabbitmq;

import java.time.LocalDateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageHandler {
	@RabbitListener(queues = "dequeQueue")
    public void process(String msg) {
        System.out.println("接收消息:"+LocalDateTime.now().toString()+" 内容:"+msg);
    }
}

测试java实例

访问入队实例地址: http://localhost:8080/send

查看运行日志,可以看到:

代码语言:javascript
复制
发送消息:2018-12-08T19:18:05.730 msg内容:hello-001

观察日志界面不动,随后会打印一条:

代码语言:javascript
复制
接收消息:2018-12-08T19:18:25.762 接收内容:hello-001

比较两条日志的时间差:20 秒,测试无误。

单条消息的过期

单条消息发送时也可以指定本消息的过期时间,那么队列过期时间和消息过期时间同时配置的时候,以时间短的限制为准。

我们要在消息中附带一些参数,使用String 类型消息就不够了。

修改出队服务:

代码语言:javascript
复制
package com.pollyduan.rabbitmq;

import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageHandler {
	@RabbitListener(queues = "dequeQueue")
	public void process(Message message) throws UnsupportedEncodingException {
		String msg=new String(message.getBody(),"UTF-8");
		System.out.println("接收消息:"+LocalDateTime.now().toString()+" 接收内容:"+msg);
	}
}

修改入队接口:

代码语言:javascript
复制
package com.pollyduan.rabbitmq;

import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {
	@Autowired
    private AmqpTemplate rabbitTemplate;

	@GetMapping("send")
	public String send() throws UnsupportedEncodingException {
		Message message = MessageBuilder.withBody("单独指定过期时间的消息".getBytes("UTF-8")).build();
		message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
		message.getMessageProperties().setExpiration("10000");

        rabbitTemplate.convertAndSend("enqueQueue", message);
        System.out.println("发送消息:"+LocalDateTime.now().toString()+" 内容:"+new String(message.getBody(),"UTF-8"));
        return "ok";
	}
}

那么队列的过期时间是20秒,这里设置的消息过期时间是10 秒,测试就会发现10秒的时候,消息就被消费掉了。

同样,修改消息过期时间为30秒,则会在20秒的时候消息过期。

单条消息过期是不可靠的

如果为单条消息设置过期,实际上是不可靠的。比如:

消息1: hello-001 过期时间:10 秒 消息2: hello-002 过期时间:5 秒

我们希望的是:消息2 先过期,消息1 后过期,那么预期的目标是在 dequeue里先拿到 消息2,后拿到 消息1。经过测试你会发现这是做不到的。

因为实际上enqueue 还是按照FIFO顺序处理的,就是说,直到 消息1 到期,判断为死信,处理;然后才会处理 消息2,因此即便后发的消息过期时间短,也不会被提前处理。

综上,为单条消息设置过期时间是不可靠的。优先选择使用队列的延迟机制。

典型应用

订单到期自动取消 消息延时同步 延迟检查状态

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 原理
  • 在management 中测试延迟队列
    • docker 部署测试rabbitmq
      • 创建出队队列
        • 创建exchange
          • 创建入队队列
            • 测试
            • java 测试
              • 创建springboot项目
                • 配置队列
                  • 发送消息
                    • 接收消息
                      • 测试java实例
                        • 单条消息的过期
                          • 单条消息过期是不可靠的
                          • 典型应用
                          相关产品与服务
                          容器服务
                          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档