刚才我们发送消息,不管成功还是失败,都不报错,结果看效果时,发现有的没有发进去,那么如何知道消息是否发送成功呢,RabbitMQ提供了一个消费监视的功能。注意:RabbitMQ发送消息分为2个阶段,消息发送到交互机里面,可以监视,消息由交互机到队列里面,也可以监视。
需要的依赖如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置 application.yml,将原来的 properties 改一下后缀名就行,内容如下:
server:
port: 8080
spring:
application:
name: Springboot-RabbitMQ
rabbitmq:
username: user
password: 123456
host: 139.196.183.130
port: 5672
virtual-host: v-it6666
# 开启消息到达交换机的确认机制
publisher-confirm-type: correlated
# 消息由交换机到达队列时失败触发
publisher-returns: true
创建 ConfirmImpl 给 RabbitTemplate 设置消息到达交换机的回调对象,内容如下所示:
/**
* @author BNTang
*/
@Component
public class ConfirmImpl implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmImpl confirmImpl;
@PostConstruct
private void initRabbitTemplate() {
this.rabbitTemplate.setConfirmCallback(confirmImpl);
}
/**
* 消息到达交换机后,该方法会回调
*
* @param correlationData 相关的数据
* @param ack 交换机是否接收成功
* @param cause 如果没有接收成功,返回拒绝的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("交换机接收消息成功");
} else {
System.out.println("交换机接收消息失败,失败原因为:" + cause);
}
}
}
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
this.rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
System.out.println("消息发送成功");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
创建 ReturnsImpl 给 RabbitTemplate 设置消息到达队列失败后回调对象,内容如下所示:
/**
* @author BNTang
*/
public class ReturnsImpl implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ReturnsImpl returnsImpl;
@PostConstruct
private void initRabbitMQ() {
this.rabbitTemplate.setReturnCallback(returnsImpl);
}
/**
* 当消息到达队列失败时,回调的方法,消息被退回了,我们可以把消息记录下来,分析错误的原因,以后重新发送,这样的话,消息就不会再丢失了
*
* @param message 消息
* @param replyCode 回退的响应码
* @param replyText 响应文本
* @param exchange 该消息来自哪个交换机
* @param routingKey 该消息的路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message);
System.out.println(replyCode + " " + replyText + " " + exchange);
}
}
测试方式,不用启动消费者,然后再去 RabbitMQ 的管理界面中删除对应的交换机,然后在发送消息即可验证。
创建 WatchMessageImpl,内容如下所示:
/**
* @author BNTang
*/
@Component
public class WatchMessageImpl implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void initRabbitTemplate() {
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
}
/**
* 消息到达交换机的回调
*
* @param correlationData
* @param ack 是否到达交换机
* @param cause 如果没有接收成功,返回拒绝的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息正常到达交换机");
} else {
System.out.println("消息没有到达交换机,原因为:" + cause);
}
}
/**
* 当消息到达队列失败时,回调的方法,消息被退回了,我们可以把消息记录下来,分析错误的原因,以后重新发送,这样的话,消息就不会再丢失了
*
* @param message 消息体
* @param replyCode 回退的响应码
* @param replyText 响应文本
* @param exchange 该消息来自哪个交换机
* @param routingKey 该消息的路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(new String(message.getBody()) + "到达队列失败:" + replyCode + " " + replyText + " 交换机为:" + exchange + " 路由key:" + routingKey);
// 处理重新发送的问题
}
}
可以传自定义对象,但是自定义的对象必须序列化,在实际开发中一般使用 JSON 串去传自定义对象。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有