大家好,我是工藤学编程 🦉 | 一个正在努力学习的小博主,期待你的关注 |
|---|---|
实战代码系列最新文章😉 | C++实现图书管理系统(Qt C++ GUI界面版) |
SpringBoot实战系列🐷 | 【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案 |
分库分表 | 分库分表之实战-sharding-JDBC分库分表执行流程原理剖析 |
消息队列 | 深入浅出 RabbitMQ-消息可靠性投递 |
前情摘要:
1、深入浅出 RabbitMQ-核心概念介绍与容器化部署 2、深入浅出 RabbitMQ-简单队列实战 3、深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略) 4、深入浅出 RabbitMQ-交换机详解与发布订阅模型实战 4、深入浅出 RabbitMQ-路由模式详解 5、深入浅出 RabbitMQ - 主题模式(Topic) 6、深入浅出 RabbitMQ - SpringBoot2.X整合RabbitMQ实战 8、深入浅出 RabbitMQ-消息可靠性投递
在分布式系统中,RabbitMQ作为常用的消息中间件,核心作用是“可靠传递消息”。但实际开发中,我们常遇到“消息发了没处理”“处理一半服务器挂了消息丢了”的问题——而消息确认机制(ACK) 就是RabbitMQ解决“消息可靠性”的关键方案。今天就从原理讲解到代码实战,把ACK机制讲透。
消费者从RabbitMQ的Broker(消息代理)中监听消息时,存在两个关键风险:
如果没有ACK机制,RabbitMQ会默认“消息投递到消费者就算成功”,直接把消息从队列删除——这就会导致上述场景下“消息没处理完却丢了”。
而ACK机制的核心逻辑是:消费者处理完消息后,主动给RabbitMQ发一个“确认信号(ACK)”,RabbitMQ只有收到这个信号,才会真正删除消息。
如果消息被消费者接收后未发送ACK,它会处于“Unacked”状态——这个状态下,RabbitMQ不会把消息重新投递给其他消费者,也不会删除,直到:
RabbitMQ的ACK确认机制默认是“自动确认”,但实际开发中“手动确认”才是保障可靠性的常用方案,两者的区别和适用场景如下:
逻辑:消费者处理完消息(业务逻辑执行完毕,比如数据库插入成功、接口调用完成)后,手动调用API发送ACK,RabbitMQ再删除消息。
优势:完全由开发者控制“何时确认消息”,避免因处理失败导致的消息丢失。
配置方式(Spring Boot项目):
在application.yml中开启手动确认,核心是配置acknowledge-mode: manual:
spring:
rabbitmq:
host: 你的RabbitMQ地址
port: 5672
username: 用户名
password: 密码
# 消费者监听配置:开启手动确认
listener:
simple:
acknowledge-mode: manual # 手动确认模式
prefetch: 1 # 可选:每次只拉取1条消息,处理完再拉取下一条,避免Unacked消息堆积讲完原理,进入代码实战。这部分要重点理解DeliveryTag(消息投递序号),以及basicAck(确认成功)、basicNack(批量拒绝)、basicReject(单个拒绝)三个核心API。
每个消费者通过Channel(信道)与RabbitMQ通信,而每个Channel的消息投递序号(DeliveryTag)是独立的——从1开始,每次消费者接收消息(或消息重新投递),DeliveryTag都会递增。
它的核心作用是:消费者确认/拒绝消息时,必须通过DeliveryTag告诉RabbitMQ“我要操作哪条消息”,避免“认错消息”。
比如:消费者第一次接消息,DeliveryTag=1;处理失败重新投递,DeliveryTag=2;再失败再投,DeliveryTag=3,以此类推。
以Spring Boot项目为例,我们通过@RabbitHandler监听消息,核心是注入Channel(RabbitMQ通信信道)和Message(消息对象),获取DeliveryTag并调用确认API。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "coupon.release.queue") // 监听的队列名
public class CouponReleaseConsumer {
// 处理消息的核心方法:body是消息内容,message是消息对象,channel是通信信道
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
// 1. 获取当前消息的DeliveryTag(关键:唯一标识当前消息)
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("当前消息投递序号(DeliveryTag):" + deliveryTag);
System.out.println("消息内容:" + body);
System.out.println("消息完整信息:" + message.toString());
try {
// 2. 核心业务逻辑:比如“释放用户优惠券”
processCouponRelease(body); // 自定义业务方法,如操作数据库、调用接口
// 3. 业务处理成功:手动发送ACK,告知RabbitMQ删除消息
// 参数1:deliveryTag:当前消息的投递序号
// 参数2:multiple:是否“批量确认”(false=仅确认当前消息,true=确认当前及之前所有未确认的消息)
channel.basicAck(deliveryTag, false);
System.out.println("消息处理成功,已发送ACK,RabbitMQ将删除该消息");
} catch (Exception e) {
// 4. 业务处理失败:根据场景决定“重新入队”还是“拒绝入队”
System.err.println("消息处理失败,body:" + body + ",错误信息:" + e.getMessage());
// 先获取当前消息的重试次数(自定义header,记录重试次数)
Integer retryCount = message.getMessageProperties().getHeader("retryCount");
if (retryCount == null) retryCount = 0;
if (retryCount < 3) { // 重试阈值:最多重试3次
// 重试:拒绝消息但允许重新入队,同时更新重试次数
message.getMessageProperties().setHeader("retryCount", retryCount + 1);
// 参数1:deliveryTag:当前消息序号
// 参数2:multiple:是否批量拒绝
// 参数3:requeue:是否重新入队(true=放回队列,等待再次消费;false=拒绝入队)
channel.basicNack(deliveryTag, false, true);
System.out.println("重试次数:" + (retryCount + 1) + ",消息已重新入队");
} else {
// 超过重试阈值:拒绝入队,同时记录异常消息到数据库(方便人工审核)
channel.basicNack(deliveryTag, false, false);
saveFailedMessage(body, e.getMessage()); // 自定义方法:将异常消息存库
System.out.println("超过重试阈值(3次),消息已拒绝入队,已记录到异常表");
}
}
}
// 模拟:业务处理方法(释放优惠券)
private void processCouponRelease(String body) {
// 这里写实际业务逻辑:比如解析body中的用户ID、优惠券ID,更新数据库状态等
// if (数据库连接失败) throw new RuntimeException("数据库异常");
}
// 模拟:异常消息记录方法
private void saveFailedMessage(String body, String errorMsg) {
// 实际开发中:将body(消息内容)、errorMsg(错误信息)、时间戳等存入数据库
// 示例SQL:insert into failed_message (content, error_msg, create_time) values (?, ?, now())
}
}上面代码中用到了basicAck(确认成功)、basicNack(拒绝)、basicReject(拒绝),三者的区别是高频考点,用表格清晰对比:
方法名 | 作用 | 支持批量操作 | 支持“重新入队” | 关键参数说明 |
|---|---|---|---|---|
basicAck | 确认消息处理成功 | 是(multiple=true) | - | multiple:true=确认当前及之前所有未确认消息 |
basicNack | 拒绝消息(可批量) | 是(multiple=true) | 是(requeue) | requeue:true=重新入队,false=拒绝入队 |
basicReject | 拒绝消息(仅单条) | 否(仅1条) | 是(requeue) | 无multiple参数,只能拒绝当前DeliveryTag的消息 |
basicNack(deliveryTag, true, false)一次性拒绝10条;basicReject因不支持批量,实际开发中用得少,优先用basicNack;requeue=false,消息会被RabbitMQ删除(或进入死信队列,需额外配置),所以一定要先记录异常消息,避免丢失。acknowledge-mode: manual
如果开启了手动确认,但代码中没调用basicAck/basicNack,消息会一直处于“Unacked”状态,导致队列堵塞(新消息无法被消费)——排查时可通过RabbitMQ管理界面(Queues -> 队列名)查看“Unacked”数量。
InvalidDeliveryTagException。
requeue=true),会导致消息反复入队、消费,占用Broker资源——建议设3-5次重试,超过后记录异常人工处理。
try-catch,避免未捕获异常导致“不发ACK”;失败消息要走“重试+记录”流程,形成闭环。basicNack批量拒绝,单个失败用basicNack单条处理;重试阈值设3-5次,避免资源浪费。觉得有用请点赞收藏! 如果有相关问题,欢迎评论区留言讨论~