RabbitMQ消息投递的路径为:
生产者 ---> 交换机 ---> 队列 ---> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
RabbitMQ的消息可靠性投递是确保消息在生产、传输和消费过程中能够准确、完整地到达目的地的重要机制。以下是关于RabbitMQ消息可靠性投递的一些关键概念和方法:
综上所述,RabbitMQ通过提供消息的确认机制、持久化、重试机制以及confirm和return机制等功能来确保消息的可靠性投递。这些机制共同协作,使得RabbitMQ成为一个高效、稳定且可靠的消息代理软件。接下来详细说明上面这些保证消息投递的可靠性机制:
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
退回模式(return)可以监听消息是否从交换机成功传递到队列。
消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。
首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: guest
password: guest
virtual-host: /
在生产者的配置类创建交换机和队列
@Configuration
public class RabbitMQConfig {
private final String EXCHANGE_NAME = "my_topic_exchange";
private final String QUEUE_NAME = "my_queue";
// 创建交换机
@Bean(EXCHANGE_NAME)
public Exchange getExchange(){
return ExchangeBuilder
// 交换机类型
.topicExchange(EXCHANGE_NAME)
// 是否持久化
.durable(true)
.build();
}
// 创建队列
@Bean(QUEUE_NAME)
public Queue getMessageQueue(){
return new Queue(QUEUE_NAME);
}
// 交换机绑定队列
@Bean
public Binding bindingMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
@Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:
生产者配置文件开启确认模式
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: guest
password: guest
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
生产者定义测试确认模式的回调方法
@Test
public void testConfirm(){
// 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 被调用的回调方法
* @param correlationData 相关配置信息
* @param ack 交换机是否成功收到了消息
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("Confirm接收成功");
}
else {
System.out.println("Confirm接收失败,原因为: "+cause);
}
}
});
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送消息
*/
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","最后发了一个狗头和撤回了一条消息,我没回");
}
运行之后,控制台应该打印出相关的测试信息:如下图:
并且,可以看到管控台也是出现了相关的交换机和队列消息:
退回模式(return)可以监听消息是否从交换机成功传递到队列,
使用方法如下:
生产者配置文件开启退回模式
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: guest
password: guest
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
# 开启回退模式
publisher-returns: true
生产者定义退回模式的回调方法,如何让他发送失败回调方法呢,很简单,只需要放一个不存在的路由键即可,代码如下:
@Test
public void testReturn() {
// 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* @param returnedMessage 失败后将失败信息封装到改参数中
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息对象: "+returnedMessage.getMessage());
System.out.println("错误码: "+returnedMessage.getReplyCode());
System.out.println("错误信息: "+returnedMessage.getReplyText());
System.out.println("交换机: "+returnedMessage.getExchange());
System.out.println("路由键: "+returnedMessage.getRoutingKey());
}
});
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送消息
*/
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","到今天也没有给我发消息");
}
执行后如下图:
如果是已经存在的路由键,则不会执行改回调方法:如下图:
可以看到什么都没有
在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。
自动确认:spring.rabbitmq.listener.simple.acknowledge="none"
手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"
消费者配置开启手动签收
消费者处理消息时定义手动签收的情况,代码如下:
@Component
public class AckConsumer {
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
// 消息投递序号,消息每次投递该值都会+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("成功接收到消息: "+message);
/**
* 参数1:消息投递序号
* 参数2:一次是否可以签收多条消息
*/
channel.basicAck(deliveryTag,true);
} catch (IOException e) {
System.out.println("消息接收失败!");
Thread.sleep(2000);
/**
* 参数1:消息投递序号
* 参数2:一次是否可以签收多条消息
* 参数3:拒签后消息是否可以重回到队列中
*/
channel.basicNack(deliveryTag,true,true);
}
}
}
成功执行后如下图:
那么如何模拟当程序出现bug拒绝签收呢,只需要在try语句加一句int i=1/0即可执行后如下:
可以看得到,一直在打印消息接收失败,那就是因为消费者一直向RabbitMQ接收消息。但是一直处理失败。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。