
可靠性投递(Reliable Delivery)是指确保消息从生产者成功到达消费者,即使面对网络故障、系统崩溃等异常情况也能保证不丢失、不重复、按顺序(部分场景)传递。
// 伪代码示例:生产端确认模式
public void sendWithConfirm(Message msg) {
// 1. 持久化到本地数据库(防丢失)
messageDao.save(msg);
// 2. 发送到消息队列
String msgId = rabbitTemplate.convertAndSend(msg);
// 3. 等待Broker确认
boolean ack = waitForAck(msgId, TIMEOUT);
// 4. 失败重试(指数退避)
if (!ack) {
retryWithBackoff(msg);
}
// 5. 最终记录投递状态
updateDeliveryStatus(msgId, ack);
}
技术要点:
delivery_mode=2
消息处理流程:
Producer → Broker接收 → 持久化存储 → 推送给Consumer → 等待ACK → 删除/重投
持久化策略:
durable=true
delivery_mode=2
// 消费端保证示例
@RabbitListener(queues = "order.queue")
public void handleOrder(OrderMessage order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
// 1. 业务处理
orderService.process(order);
// 2. 手动确认(成功才ACK)
channel.basicAck(tag, false);
// 3. 更新消费记录
consumeRecordService.markConsumed(order.getId());
} catch (Exception e) {
// 4. 失败处理:重试或进入死信队列
if (retryCount < MAX_RETRY) {
channel.basicNack(tag, false, true); // 重入队列
} else {
channel.basicNack(tag, false, false); // 进入死信队列
alarmService.notifyAdmin(order, e);
}
}
}
消费端关键点:
public boolean processWithIdempotent(String msgId) {
// 基于消息ID去重
if (redis.exists("processed:" + msgId)) {
return true; // 已处理过
}
// 业务处理
boolean success = doBusinessLogic();
// 记录处理状态
if (success) {
redis.setex("processed:" + msgId, 24h, "1");
}
return success;
}
两阶段提交:
1. 发送Half Message(预备消息)
2. 执行本地事务
3. 根据本地事务结果Commit/Rollback
4. Broker检查事务状态并投递/丢弃
# 补偿机制实现
def reliable_delivery(message):
max_retries = 5
for attempt in range(max_retries):
try:
# 尝试投递
result = mq_client.send(message)
if result.confirmed:
log_delivery_success(message.id)
return True
except Exception as e:
log_failure(attempt, e)
if attempt == max_retries - 1:
# 最终失败,人工介入
send_alert_to_admin(message)
save_to_compensation_table(message)
return False
# 等待后重试
sleep(backoff_time(attempt))
return False
-- 本地消息表结构
CREATE TABLE local_message (
id BIGINT PRIMARY KEY,
biz_id VARCHAR(64), -- 业务ID
content TEXT, -- 消息内容
status TINYINT, -- 0:待发送, 1:已发送, 2:已确认
retry_count INT,
next_retry_time DATETIME,
created_at TIMESTAMP
);
工作流程:
// 批量消息的可靠性处理
public class BatchMessageReliableSender {
public void sendBatch(List<Message> batch) {
// 1. 批量持久化到本地
batchMessageDao.saveAll(batch);
// 2. 设置批次ID
String batchId = generateBatchId();
// 3. 发送批次消息
boolean success = mqTemplate.sendBatch(batchId, batch);
// 4. 批次确认(或单条补偿)
if (success) {
markBatchDelivered(batchId);
} else {
// 逐条重试或记录异常
compensateFailedMessages(batch);
}
}
}
-- 消息对账SQL示例
SELECT
DATE(create_time) as day,
COUNT(*) as total_sent,
SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) as confirmed,
SUM(CASE WHEN status=1 THEN 1 ELSE 0 END) as pending
FROM message_record
GROUP BY DATE(create_time)
HAVING total_sent != confirmed;
特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
可靠性机制 | 确认+持久化+镜像队列 | 副本机制+ACK+Exactly-Once | 事务消息+本地存储 |
顺序性 | 单队列保证 | Partition内有序 | Queue内有序 |
事务支持 | 轻量级事务(性能差) | 支持Exactly-Once语义 | 完整事务消息 |
最佳适用场景 | 业务消息、高可靠要求 | 日志流、大数据场景 | 金融交易、订单业务 |
# 配置示例:多级降级
mq:
primary:
url: "amqp://primary"
timeout: 1000ms
secondary:
url: "amqp://secondary"
timeout: 2000ms
fallback-to-db: true # 最终降级到数据库
消息的可靠性投递是一个系统工程,需要在生产端、Broker端、消费端协同设计,结合业务场景、性能要求、成本约束做出合适的选择。没有"银弹"方案,只有最适合的方案。建议从简单方案开始,随着业务复杂度增加逐步引入更完善的可靠性机制。
首先,消息可靠性投递指的是: 一个消息从发送到被消费者成功处理,过程中不会丢失或重复,保证最终数据的一致性。在实际系统里,消息可能因为网络问题、服务重启等原因丢失或重复,所以我们需要一套机制来确保可靠。
为什么需要它呢? 比如在订单系统中,用户支付成功后要通知物流系统,如果消息丢了,物流就不会触发,用户体验就受损;如果消息重复,可能重复发货,造成损失。所以像金融、交易这些场景,可靠性特别重要。
常见的实现方式,我了解的有几种:
实际中我们一般会结合业务来设计。 比如一个订单状态同步的场景,我可能会用:生产者确认 + 消息持久化 + 消费者手动ACK + 消费端幂等性。这样基本能覆盖发送、存储、消费各个环节的可靠性。
当然,可靠性和性能之间需要权衡,比如持久化会降低吞吐量,手动ACK会增加延迟。所以要根据业务需求来选择合适的方案。
追加:遇到过消息丢失或重复的问题,你是怎么排查和解决的?
追加:是否了解最终一致性、最大努力通知等模式 ?
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。