前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >回答面试官:如何保证消息不丢失

回答面试官:如何保证消息不丢失

作者头像
Java宝典
发布2021-11-09 16:04:48
5380
发布2021-11-09 16:04:48
举报
文章被收录于专栏:JAVA 框架/源码学习

先捞一捞之前的文章

rocketmq是阿里开源的一个性能很强大的消息队列,很多公司都在用,而且经历了多次双十一的洗礼,支持多种特性

对于这个技术点不知道大家掌握的如何了,消息队列现在应该是公司必备的技能之一了,无论是RabbitMQ还是rocketmq,或者支持大数量的kafka

今天我们要说的一个问题,是rocketmq如何保证消息的不丢失??

不知道大家对于这个问题遇到过没有,或者大家听到这个问题的第一反应是什么,应该如何做,如何避免消息丢失,一起来看看

首先我们知道rocketmq的一个消息从生产到最终的消费过程需要经历总共三个阶段,或者说会经过三个地方,分别是producer的发送端、broker的持久化机制、以及consumer的消费端

从生产者producer的角度:消息生产之后传递到broker,如果消息未能正确的存储到broker中,算作消息丢失

从broker的角度:消息默认保存到broker的内存中,异步保存到磁盘上,如果发生宕机、磁盘崩溃会造成消息丢失

从消费者consumer的角度:消息完成了持久化之后,consumer拉取之后未能成功消费且未反馈给broker,这样算作消息丢失,可能消费过程异常或者网络抖动造成消息丢失

生产者角度:消费生产之后传递到broker,如果消息未能正确的保存到broker中,算作消息丢失

从生产者的角度,生产了消息就是要通过网络发送到broker,其实只需要保证一点,就是确认这个消息已经成功发送到broker上了

生产者只需要接收发送消息返回的确认响应即可,就可以代表消息发送成功

代码示例:

代码语言:javascript
复制
DefaultMQProducer mqProducer=new DefaultMQProducer("test");
// 设置 nameSpace地址
mqProducer.setNamesrvAddr("namesrvAddr");
mqProducer.start();
Message msg = new Message("topic" /* Topic */,
        "Captain".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到Broker
try {
    SendResult sendResult = mqProducer.send(msg);
} catch (Exception e) {
    e.printStackTrace();
}

当然,发送消息也分为同步和异步两种,消息发送成功之后会返回下面这四种不同的响应状态

SEND_OK

消息发送成功,但是也并不意味这完全代表不会丢失消息,这还要取决于broker的刷盘方式

这个下面在broker方面会说,需要启动SYNC_MASTER或SYNC_FLUSH。(也就是同步)

FLUSH_DISK_TIMEOUT

如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,得到这个状态

也就是此时刷盘超时,未能在规定时间内落到硬盘上,检查设置是否合理、硬盘大小等情况

FLUSH_SLAVE_TIMEOUT

如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,返回这个结果

主从同步时间默认也是5秒,需要完成主从同步,这个下面在说broker的时候也会说到,你想啊,要是master挂了或者磁盘崩溃了,这是不是也不能百分百保证消息不丢失

SLAVE_NOT_AVAILABLE

如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,返回这个状态

这个在保证消息绝对不丢失的情况下是不允许存在的,这个状态也是属于需要处理的,没有可靠的slave,也就意味着没有可靠的备份数据,所以这种情况也需要考虑

另外呢,上面还说了一种异步的消息发送方式,这种一般是用于链路较长,对于时间比较敏感的业务

这种情况下需要特别注意的就是我们需要设置消息发送完成的回调,这样才能更好的保证消息不丢失

采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的

可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失

broker:消息默认保存到broker的内存中,异步保存到磁盘上,如果发生宕机、磁盘崩溃会造成消息丢失

顺序消费这个场景其实不是特别的常见,但是也是必不可少的,因为在某些业务场景下顺序是很关键的,保证消息的消费顺序也是很关键

消息到了broker之后,默认是优先保存到broker的内存中,然后立刻返回响应给生产者producer,然后broker自己定期将消息批量的异步的保存到硬盘上

有的小伙伴一小子就发现了问题不是那么简单,消息来了之后还没保存到硬盘,就直接返回了,broker直接宕机崩溃了,那这消息岂不无迹可寻了

这样的优点是提高交互的效率,同时减少IO的次数,问题就是会造成消息丢失

如果我们想要保证消息不丢失,那就需要保证消息成功保存到broker之后才可以返回,只需要将消息的保存机制修改为同步刷盘的方式,也就是只有消息保存到broker的磁盘成功之后,才会返回响应

代码语言:javascript
复制
## 默认情况为 ASYNC_FLUSH 
flushDiskType = SYNC_FLUSH

如果broker未能在规定的同步时间(默认5秒)完成刷盘,将返回FLUSH_DISK_TIMEOUT给生产者

上面也介绍了这个了FLUSH_DISK_TIMEOUT

一般在系统中为了保证可用性,broker通常采用的都是一主master多从slave的部署方式,属于集群部署

为了保证消息不丢失,消息需要复制到slave节点,其实默认的情况下,消息写入到broker之后就会返回成功

但是!如果master突然宕机或者磁盘崩溃了,那么这个消息就彻底丢失了,没有备份,所以呢,这里还需要把master和slave的异步复制改成同步复制

代码语言:javascript
复制
## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

也就是只有slave也刷盘到磁盘成功之后,才会给producer返回成功

当然你要这里说,master和slave也可能同时宕机,同时磁盘崩溃,那最终还是无法满足百分百保证消息的不丢失

这种问题啊,其实就像是TCP的三次交互一样,三次交互之后一定保证客户端和服务端通信成功了吗,答案是不一定

我们只能在有限的资源下尽量的去满足系统的稳定性

consumer:消息完成了持久化之后,consumer拉取之后未能成功消费且未反馈给broker,这样算作消息丢失,可能消费过程异常或者网络抖动造成消息丢失

消费者从broker拉取消息,然后进行相应的业务的消费,消费成功会返回一个消费成功的状态给broker,broker如果没收到确认信息,消费者下次拉取重新拉取该消息

代码语言:javascript
复制
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer自身可以维护一个持久化的offset,对应MessageQueue里面的min offset,标记已经成功消费或者已经成功发回到broker的消息下标

如果consumer消费失败,会把这个消息发回给broker,发回成功后,更新自己的offset

如果发回给broker时,broker挂掉了,那么consumer也会定时重试这个操作

即使consumer和broker一起挂掉了,消息也不会丢失,因为consumer里面的offset会定时持久化,重启之后,继续拉取offset之前的消息到本地,重新消费

佛系求关注

Captain希望有一天能够靠写作养活自己,现在还在磨练,这个时间可能会持续很久,但是,请看我漂亮的坚持

感谢大家能够做我最初的读者和传播者,请大家相信,只要你给我一份爱,我终究会还你们一页情的。

Captain会持续更新技术文章,和生活中的暴躁文章,欢迎大家关注【Java贼船】,成为船长的学习小伙伴,和船长一起乘千里风、破万里浪

哦对了,后续所有的远程文章都会更新到这里

https://github.com/DayuMM2021/Java

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java宝典 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档