延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
RabbitMQ本身并不直接支持延迟队列(Delayed Message Queue)的功能,但你可以通过结合RabbitMQ的TTL(Time-To-Live,消息过期时间)和死信队列(Dead Letter Queue, DLQ)的特性来模拟实现延迟队列的效果。
以下是一个基于RabbitMQ TTL和DLQ实现延迟队列的步骤:
这个队列将用于接收并尝试消费消息。如果消息在一定时间内没有被消费或者消费失败,它们将被发送到死信队列。
这个队列将用于接收来自普通队列的死信消息。可以在这里设置消费者来处理延迟的消息。
需要一个交换机来将消息从生产者路由到普通队列,并且还需要一个交换机(可以是同一个交换机)来将死信从普通队列路由到死信队列。
将普通队列和死信队列分别绑定到相应的交换机上,并设置正确的路由键。具体代码如下:
@Configuration
public class RabbitMQConfig {
private final String ORDER_EXCHANGE = "order_exchange";
private final String ORDER_QUEUE = "order_queue";
private final String EXPIRE_EXCHANGE = "expire_exchange";
private final String EXPIRE_QUEUE = "expire_queue";
// 创建过期交换机
@Bean(EXPIRE_EXCHANGE)
public Exchange getExchange(){
return ExchangeBuilder
// 交换机类型
.topicExchange(EXPIRE_EXCHANGE)
// 是否持久化
.durable(true)
.build();
}
// 创建过期队列
@Bean(EXPIRE_QUEUE)
public Queue getMessageQueue(){
return QueueBuilder
.durable(EXPIRE_QUEUE)
.build();
}
// 创建订单交换机
@Bean(ORDER_EXCHANGE)
public Exchange getExchange1(){
return ExchangeBuilder
// 交换机类型
.topicExchange(ORDER_EXCHANGE)
// 是否持久化
.durable(true)
.build();
}
// 创建订单队列
@Bean(ORDER_QUEUE)
public Queue getMessageQueue2(){
return QueueBuilder
.durable(ORDER_QUEUE)
// 绑定过期交换机
.deadLetterExchange(EXPIRE_EXCHANGE)
// 过期队列路由关键字
.deadLetterRoutingKey("expire_routing")
// 队列每条消息只能存活15s
.ttl(15000)
// 队列最大长度10
.maxLength(10)
.build();
}
// 过期交换机绑定过期队列
@Bean
public Binding bindingMessageQueue5(@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,
@Qualifier(EXPIRE_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("expire_routing")
.noargs();
}
// 订单交换机绑定订单队列
@Bean
public Binding bindingMessageQueue4(@Qualifier(ORDER_EXCHANGE) Exchange exchange,
@Qualifier(ORDER_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order_routing")
.noargs();
}
}
在发送消息到普通队列时,为消息设置一个TTL(Time-To-Live)。当消息在队列中等待的时间超过TTL时,它将被视为死信并被发送到死信队列。
这里在上诉代码中已经设置了15s的存活时间。
使用RabbitMQ的客户端库(如Spring AMQP的RabbitTemplate
)发送消息到普通队列,并设置消息的TTL。
// 下单
@GetMapping("/place/{orderId}")
public String placeOrder(@PathVariable String orderId){
System.out.println(LocalDateTime.now()+"处理订单数据.....");
// 向rabbitmq发送订单id
rabbitTemplate.convertAndSend("order_exchange","order_routing",orderId);
return LocalDateTime.now()+"下单成功,修改库存";
}
这里发送了下单请求
在死信队列的消费者中处理延迟的消息。
@Component
public class ExpireOrderConsumer {
// 消费者2
@RabbitListener(queues = "expire_queue")
public void listenMessage2(String orderId){
System.out.println(LocalDateTime.now()+"查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}
}
访问接口。
这里可以看得到,15s之后监听订单的消息,可以知道订单是否处理。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。