RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。 当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。 死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。 死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:
RabbitMQ的工作模式
死信队列的工作模式 今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。
win10 安装rabbitMQ详细步骤_rabbitmq 安装-CSDN博客 我这里直接引用别人的文章了,下载需要大家去看一看。 RabbitMQ延迟插件的安装。 [超详细]RabbitMQ安装延迟消息插件_rabbitmq安装延迟插件-CSDN博客
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:
在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。
我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。
接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。
// 延迟队列交换器名称
public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
// 延迟队列A名称
public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";
// 延迟队列B名称
public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";
// 延迟队列routingA名称
public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";
// 延迟队列routingB名称
public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";
// 死信队列
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";
public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";
public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";
public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";
// 注册延迟交换器delayExchange
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。
// 注册延迟队列A 还要绑定死信交换器和死信routingA
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String,Object> args = new HashMap<>();
args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);
//args.put("x-message-ttl",6000);
return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
}
// 注册延迟队列B 还要绑定死信交换器和死信routingB
@Bean("delayQueueB")
public Queue delayQueueB(){
Map<String,Object> args = new HashMap<>();
args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);
args.put("x-message-ttl",10000);
return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
}
// 延迟队列A绑定交换器
@Bean
public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){
return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);
}
// 延迟队列B绑定交换器
@Bean
public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){
return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);
}
与延迟队列不同的是,死信队列并没有配置延迟参数。
// 注册死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUE_A_NAME);
}
// 注册死信队列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUE_B_NAME);
}
// 注册死信交换器
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 死信队列A绑定死信交换器
@Bean
public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
}
// 死信队列B绑定死信交换器
@Bean
public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){
return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
}
到此为止,RabbitMQ的组件配置完成。
server:
port: 8081
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。
@Component
public class DeadLetterQueueConsumer {
public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);
@RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
System.out.println(message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。
@Component
public class DelayMessageSender {
@Resource
RabbitTemplate rabbitTemplate;
public void sendMessage(String msg,Integer delayTimes){
switch (delayTimes){
case 6:
rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(6000));
return message;
}
});
break;
case 10:
rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);
break;
}
}
}
@RestController
@RequestMapping("/student")
public class StudentController {
@Autowired
DelayMessageSender messageSender;
@RequestMapping("/send-message")
public String sendMessage(String msg,Integer delayTimes){
System.out.println(new Date());
messageSender.sendMessage(msg,delayTimes);
return "发送成功";
}
}
在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。
http://localhost:15672/
先来看看我们的初始队列。这里是什么都没有的。
然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。
使用PostMan发送一次请求。
我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。
接下来我们测试10s的延迟队列。
10s后死信队列B成功的接收到了消息。
延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。