大家好,我是工藤学编程 🦉 | 一个正在努力学习的小博主,期待你的关注 |
|---|---|
实战代码系列最新文章😉 | C++实现图书管理系统(Qt C++ GUI界面版) |
SpringBoot实战系列🐷 | 【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案 |
分库分表 | 分库分表之实战-sharding-JDBC分库分表执行流程原理剖析 |
消息队列 | 深入浅出 RabbitMQ - SpringBoot2.X整合RabbitMQ实战 |
前情摘要:
1、深入浅出 RabbitMQ-核心概念介绍与容器化部署 2、深入浅出 RabbitMQ-简单队列实战 3、深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略) 4、深入浅出 RabbitMQ-交换机详解与发布订阅模型实战 4、深入浅出 RabbitMQ-路由模式详解 5、深入浅出 RabbitMQ - 主题模式(Topic) 6、深入浅出 RabbitMQ - SpringBoot2.X整合RabbitMQ实战
在分布式系统中,消息队列作为解耦服务、削峰填谷的核心组件,其消息传递的可靠性直接影响业务稳定性。想象一下,电商订单支付后因消息丢失导致物流系统未触发发货,这样的问题足以让用户流失。今天我们就深入探讨RabbitMQ如何实现消息的可靠性投递,从底层原理到实战代码一网打尽。
消息可靠性投递指的是确保消息从生产者发送到消费者的全链路过程中不丢失,具体包含三层含义:
RabbitMQ的消息投递路径是:生产者 → 交换机(Exchange)→ 队列(Queue)→ 消费者
在这条路径上,有两个最容易发生消息丢失的节点,也是我们需要重点控制的环节:
“为什么消息进入队列后,相对不容易发生丢失?” 或者 “为什么队列到消费者的环节相对不容易丢失消息?”,可以从 RabbitMQ 的设计机制来解释:
当生产者发送消息后,RabbitMQ的Broker节点收到消息会返回一个ACK确认信号。通过ConfirmCallback回调,生产者可以明确知道消息是否成功到达交换机,这是可靠性投递的核心机制。
# 新版配置(推荐)
spring.rabbitmq.publisher-confirm-type: correlated # 消息到达交换机后触发回调
# 旧版配置(已过时)
# spring.rabbitmq.publisher-confirms: true@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testConfirmCallback() {
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("=====ConfirmCallback触发====");
System.out.println("消息是否到达交换机:" + (ack ? "成功" : "失败"));
System.out.println("失败原因:" + cause);
// 业务处理:失败时可记录日志、执行重试或存入本地消息表
if (!ack) {
log.error("消息发送失败,准备重试: {}", correlationData.getId());
// retryLogic(correlationData);
}
});
// 发送消息
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 消息唯一标识
rabbitTemplate.convertAndSend(
"order.exchange", // 交换机名称
"order.new", // 路由键
"新订单创建:ID=1001", // 消息内容
correlationData
);
}若故意修改交换机名称为不存在的"invalid.exchange",回调会收到ack=false,并返回"channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘invalid.exchange’ in vhost ‘/’, class-id=60, method-id=40)"的错误原因。
消息成功到达交换机后,若因路由键错误、队列不存在等原因导致无法路由到队列,默认情况下消息会被直接丢弃。ReturnCallback机制可以捕获这类路由失败的消息,让生产者有机会处理。
spring.rabbitmq.publisher-returns: true # 开启返回回调
spring.rabbitmq.template.mandatory: true # 强制要求路由失败时返回消息@Test
void testReturnCallback() {
// 设置返回回调
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.out.println("=====ReturnCallback触发====");
System.out.println("状态码:" + returnedMessage.getReplyCode());
System.out.println("路由失败原因:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
System.out.println("消息内容:" + new String(returnedMessage.getMessage().getBody()));
// 业务处理:可记录失败消息,人工介入或重新投递
});
// 发送消息(故意使用错误路由键)
rabbitTemplate.convertAndSend(
"order.exchange",
"invalid.key", // 不存在的路由键
"新订单创建:ID=1002"
);
}开启Confirm和Return机制后,每一条消息都会增加一次网络交互(Broker返回确认),这会导致:
因此,非核心业务消息不建议开启(如日志收集、非关键通知)。对于核心业务(订单、支付),可结合本地消息表实现最终一致性:
RabbitMQ的消息可靠性投递通过两大回调机制实现:
在实际应用中,需根据业务重要性权衡可靠性与性能,核心场景建议结合本地消息表做补偿,非核心场景可适当放宽策略。
掌握这些机制,就能在分布式系统中构建起可靠的消息传递链路,为业务稳定性保驾护航。
觉得有用请点赞收藏! 如果有相关问题,欢迎评论区留言讨论~