rabbitmq 本身不支持延迟队列,但提供了实现延迟队列的必备条件。
x-message-ttl
参数设置过期时间,到了过期时间的消息就会被标记为 dead letter
状态。x-dead-letter-exchange
和 x-dead-letter-routing-key
参数转发到另一个 exchange
中去。docker-compose.yml
version: "3"
services:
rabbitmq:
image: rabbitmq:3.7.12-management-alpine
container_name: rabbitmq
ports:
- 15672:15672
- 5671-5672:5671-5672
启动:
docker-compose up -d
Queues
面板,打开 Add a new queue
栏目。Name: delay_queue_deque
Exchanges
面板,打开 Add a new exchange
栏目。Name: delay_exchange
点击新创建的 delay_exchange;
打开 Bindings 栏目;
选择 "To queue" ,并输入值 "delay_queue_deque";
"Routing key:" 中输入 "delay_routing_key"
To queue
要和前面的出队队列配置的 Name
的值相同。Queues
面板,打开 Add a new queue
栏目。Name: delay_queue_enque
Arguments:
x-message-ttl 30000
x-dead-letter-exchange delay_exchange
x-dead-letter-routing-key delay_routing_key
x-dead-letter-exchange
要和前面的 exchange 配置的 Name
的值相同。x-dead-letter-routing-key
要和 exchange 添加的Binding 的 Routing key
的值相同。在 Queues 面板中,点击 delay_queue_enque
队列。
找到 Publish message
,在 payload
中输入测试内容:"hello-001",点击 Publish message
按钮。
点击 "Queues" 面板按钮,就会看到delay_queue_enque
队列中有了一条数据,Ready:1。
等待30秒,消息就会转到 delay_queue_deque
中去了。
点击 delay_queue_deque
,找到 Get messages
栏目,点击 Get messages
按钮,检查取出的数据,为publish的message,测试无误。
Nack message requeue true
,改为 Ack message requeue false
并再次获取消息 ,消息就会真的被消费掉了。在management 中的测试理解了,在java中的代码就容易理解了。
引入 RabbitMQ
组件,或者手动加入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
private static final String DEQUE_QUEUE_NAME = "dequeQueue";
private static final String ENQUE_QUEUE_NAME = "enqueQueue";
private static final String DEQUE_QUEUE_NAME_KEY = "dequeQueueKey";
private static final String DELAY_EXCHANGE="exchange_delay";
//死信路由
@Bean
DirectExchange exchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
//用于延时消费的队列
@Bean
public Queue dequeQueue() {
Queue queue = new Queue(DEQUE_QUEUE_NAME,true,false,false);
return queue;
}
//绑定exchange 到出队队列
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(dequeQueue()).to(exchange()).with(DEQUE_QUEUE_NAME_KEY);
}
//配置死信队列,即入队队列
@Bean
public Queue deadLetterQueue() {
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl", 20000);
args.put("x-dead-letter-exchange", DELAY_EXCHANGE);
args.put("x-dead-letter-routing-key", DEQUE_QUEUE_NAME_KEY);
return new Queue(ENQUE_QUEUE_NAME, true, false, false, args);
}
package com.pollyduan.rabbitmq;
import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("send")
public String send() {
String msg="hello-001";
System.out.println("发送消息:"+LocalDateTime.now().toString()+" 内容:"+msg);
rabbitTemplate.convertAndSend("enqueQueue", msg);
return "ok";
}
}
package com.pollyduan.rabbitmq;
import java.time.LocalDateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageHandler {
@RabbitListener(queues = "dequeQueue")
public void process(String msg) {
System.out.println("接收消息:"+LocalDateTime.now().toString()+" 内容:"+msg);
}
}
访问入队实例地址: http://localhost:8080/send
查看运行日志,可以看到:
发送消息:2018-12-08T19:18:05.730 msg内容:hello-001
观察日志界面不动,随后会打印一条:
接收消息:2018-12-08T19:18:25.762 接收内容:hello-001
比较两条日志的时间差:20 秒,测试无误。
单条消息发送时也可以指定本消息的过期时间,那么队列过期时间和消息过期时间同时配置的时候,以时间短的限制为准。
我们要在消息中附带一些参数,使用String 类型消息就不够了。
修改出队服务:
package com.pollyduan.rabbitmq;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageHandler {
@RabbitListener(queues = "dequeQueue")
public void process(Message message) throws UnsupportedEncodingException {
String msg=new String(message.getBody(),"UTF-8");
System.out.println("接收消息:"+LocalDateTime.now().toString()+" 接收内容:"+msg);
}
}
修改入队接口:
package com.pollyduan.rabbitmq;
import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@GetMapping("send")
public String send() throws UnsupportedEncodingException {
Message message = MessageBuilder.withBody("单独指定过期时间的消息".getBytes("UTF-8")).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
rabbitTemplate.convertAndSend("enqueQueue", message);
System.out.println("发送消息:"+LocalDateTime.now().toString()+" 内容:"+new String(message.getBody(),"UTF-8"));
return "ok";
}
}
那么队列的过期时间是20秒,这里设置的消息过期时间是10 秒,测试就会发现10秒的时候,消息就被消费掉了。
同样,修改消息过期时间为30秒,则会在20秒的时候消息过期。
如果为单条消息设置过期,实际上是不可靠的。比如:
消息1: hello-001 过期时间:10 秒 消息2: hello-002 过期时间:5 秒
我们希望的是:消息2 先过期,消息1 后过期,那么预期的目标是在 dequeue里先拿到 消息2
,后拿到 消息1
。经过测试你会发现这是做不到的。
因为实际上enqueue 还是按照FIFO顺序处理的,就是说,直到 消息1
到期,判断为死信,处理;然后才会处理 消息2
,因此即便后发的消息过期时间短,也不会被提前处理。
综上,为单条消息设置过期时间是不可靠的。优先选择使用队列的延迟机制。
订单到期自动取消 消息延时同步 延迟检查状态