Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >使用消息中间件时,如何保证消息仅仅被消费一次?

使用消息中间件时,如何保证消息仅仅被消费一次?

原创
作者头像
平头哥的技术博文
修改于 2020-03-12 02:23:36
修改于 2020-03-12 02:23:36
9920
举报

消息中间件使用广泛,常用来削峰填谷、系统解耦、异步处理。异步处理可能是使用的最多的场景了,比如现在的技术博客网站,都采用积分制,用户发表一篇文章后,可以获取想要的积分,为了提升系统的性能,给用户加积分的操作可以异步处理,并不需要放在同步流程中。

我们可以把用户ID,需要增加的积分封装成一条消息投递到消息系统中,异步处理加积分操作,由于这是发生在不同服务器之间,消息有可能投递失败、处理失败等问题,从而导致用户加积分失败,还有一种可能是消息重复投递,那么用户就有可能重复加积分,不管出现那种情况,都是不正常的情况。

要避免上面的两种情况,就需要我们尽量保证消息不丢失和消息只被消费一次,这篇文章抛开具体的消息中间件,从消息系统的通用层面上,谈谈如何避免这两种情况。

1、保证消息不丢失

一条消息从生产到消费这条链路中,有三个地方可能会造成消息丢失,分别如下:

  • 消息从生产者写入到消息队列的过程投递失败。
  • 消息在消息队列中,持久化失败。
  • 消息被消费者消费的过程出现异常。

1.1 在消息生产的过程中投递失败

消息生产者和消息系统一般都是独立部署在不同的服务器上,两台服务器之间要通信就要通过网络来完成,网络是不稳定,可能会发生抖动,那么数据就有可能丢失。网络发生抖动会有以下两种情况。

在消息生产的过程中丢失消息
在消息生产的过程中丢失消息

情景一:消息在传送给消息系统的过程中发生网络抖动,数据直接丢失。

情景二:消息已经到达消息系统,但是在消息系统给生产者服务器返回信息时,网络发生抖动,此时的数据不一定真正的丢失,很可能只是生产者认为数据丢失

针对消息在消息生产时丢失,可以采取重投机制,当程序检测到网络异常时,将消息再次投递到消息系统。但是重新投递在情景二情况下,可能造成数据重复,如何解决这个问题,在后面会提到。

1.2 在消息队列中持久化失败

消息系统是可以对消息进行持久化,一般都是将消息存储到本地磁盘中,当然也有少数消息中间件支持将数据持久化到数据库中,那么消息系统的性能可能就会下降。

如果你对 Redis 的持久化有一定的了解话,你会发现 Redis 在持久化数据时并不是每新增一条就立即存入到本地磁盘,而是会将数据先写入到操作系统的 Page Cache 中,当满足一定条件时,再将 Page Cache 中的数据刷入磁盘,因为这样可以减少对磁盘的随机 I/O 操作,我们知道随机 I/O 是非常耗时的,这样也提高了系统性能,消息中间件也不例外,在持久化时也是采用这种方式。

在某些极端情况下,可能会造成 Page Cache 中的数据丢失,比如突然停电或者机器异常重启操作。要解决 Page Cache 中数据丢失问题,可以采用集群部署的方式,来尽量保证数据不丢失。

1.3 在消费的过程中存在消息丢失

消息在消费过程中也是会发生丢失的,而且在消费过程中丢失的概率要比前两种情况大很多。一条消息消费过程大概分成三步:消费者拉取消息,消费者处理消息,消息系统更新消费进度。

图片描述
图片描述

第一步在拉取消息的时候可能发生网络抖动异常,第二步在处理消息的时候可能发生一些业务异常,而导致流程并没有走完,如果在第一步、第二步发生异常的情况下,通知消息系统更新消费进度,那么这条失败的消息就永远不会在被处理了,自然就丢失了,其实我们的业务并没有跑完。

要避免消息在消费时丢失的情况,可以在消息接收和处理完成之后才更新消费进度,但是在极端的情况下,会出现消息重复消费的问题,比如某一条消息在处理完成之后,消费者宕机了,这时还没有更新消费进度,消费者重启后,这条消息还是会被消费到。

2、如何保证消息只被消费一次

消息系统本身不能保证消息仅被消费一次,因为消费本身可能重复、下游系统启动拉取重复、失败重试带来的重复、补偿逻辑导致的重复都有可能造重复消息,要保证消息仅被消费一次可以利用等幂性来实现

等幂是数学上的一个概念,就是多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

从等幂的概念上就可以看出来,就算消息执行多次也不会对系统造成影响,那么在使用消息系统时如何保证等幂性呢?因为生产者和消费者都有可能产生重复消息,所以要在生产者和消费者两端都保证等幂性。

保证生产者等幂性,在生产消息的时候,利用雪花算法给消息生成一个全局 ID,在消息系统中维护消息已 ID 映射关系,如果在映射表中已经存在相同 ID,这丢弃这条消息,虽然消息被投递了两次,但是实际上就保存了一条,避免了消息重复问题。

生产者等幂性跟所选者的消息中间件有关系,因为绝大数情况下消息系统不需要我们自己实现,所以等幂性是不太好控制的,消费者等幂性才是我们开发人员控制的重点方向

在消费者端可以从通用层和业务层两个方面来做等幂操作,取决于我们的业务要求。

在通用层面中,利用好消息生成是产生的全局唯一ID,消息被处理成功后,把这个全局 ID 存入到数据中,在处理下一条消息之前,先从数据库中查询这个全局 ID 是否存在,如果已经存在,则直接放弃该消息。

利用这个全局唯一ID就实现了消息等幂性,伪代码如下:

代码语言:txt
AI代码解释
复制
boolean isIDExisted = selectByID(ID); // 判断ID是否存在
if(isIDExisted) {
  return; //存在则直接返回
} else {
  process(message); //不存在,则处理消息
  saveID(ID);   //存储ID
}

但是在极端情况下,这种方式还是会出问题,如果消息在处理之后,还没来得及保存到数据库,消费者就宕机重启了,重启之后还会再次获取该消息,执行时查询该消息并未被消费过,还是会执行两次消费。可以引入数据库事务来解决这个问题,但是会降低系统性能。如果对消息重复消费没有特别严格要求的话,直接使用这种没有引入事务的通用方案就好了,毕竟这也是极小概率的事情。

在业务层面上,我们可选择性就变多了,比如乐观锁、悲观锁、内存去重(https://github.com/RoaringBitmap/RoaringBitmap)等方法。

我们拿乐观锁来举例,比如我们要给一个用户加积分,因为加积分操作并不需要放在主业务中,所以就可以使用消息系统来异步通知,要使用乐观锁,就需要给积分表添加一个版本号字段。并且在生产消息的时候先查询这个账号的版本号并且连同消息一起发送到消息系统中。

图片描述
图片描述

消费者拿到消息和版本号后,在执行更新积分操作的 SQL 时带上版本号,类似于:

代码语言:txt
AI代码解释
复制
update score set score = score + 20, version=version+1 where userId=1 and version=1;

这条消息消费成功后,version 就变成了 2,那么如果有重复的 version=1 的消息再次被消费者拉取到,SQL 语句并不会执行成功,从而保证了消息的幂等性。

要保证消息仅被消费一次,我们需要把重点放在消费者这一段,利用等幂性来保证消息被消费一次。

今天站在消息中间件的通用层面上,聊了聊如何保证数据不丢失和仅被消费一次,希望今天的文章对您的学习或者工作有所帮助,如果您认为文章有价值,欢迎点个赞,谢谢。

最后

目前互联网上很多大佬都有消息中间件相关文章,如有雷同,请多多包涵了。原创不易,码字不易,还希望大家多多支持。若文中有所错误之处,还望提出,谢谢。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【百面成神】消息中间件基础7问,你能撑到第几问
其实就是一个以队列作为消息通信的组件,本质上是一个消息转发器。可以对消息进行接收、存储和消费。当前业界比较流行的消息中间件有rabbitmq,rocketmq和Kafka。我用的比较多的是rabbitmq。
半旧518
2023/10/17
3670
【百面成神】消息中间件基础7问,你能撑到第几问
消息中间件
https://www.open-open.com/lib/view/open1421150566328.html
Yano_nankai
2021/01/26
1K0
消息中间件
消息中间件面试题31道RabbitMQ+ActiveMQ+Kafka
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
程序员追风
2019/07/26
1.2K0
消息中间件面试题31道RabbitMQ+ActiveMQ+Kafka
消息中间件MQ与RabbitMQ面试题(2020最新版)
Java面试总结汇总,整理了包括Java基础知识,集合容器,并发编程,JVM,常用开源框架Spring,MyBatis,数据库,中间件等,包含了作为一个Java工程师在面试中需要用到或者可能用到的绝大部分知识。欢迎大家阅读,本人见识有限,写的博客难免有错误或者疏忽的地方,还望各位大佬指点,在此表示感激不尽。文章持续更新中…
全栈程序员站长
2022/08/11
3300
消息中间件MQ与RabbitMQ面试题(2020最新版)
如何保障消息中间件100%消息投递成功?如何保证消息幂等性?
我们小伙伴应该都听说够消息中间件MQ,如:RabbitMQ,RocketMQ,Kafka等。引入中间件的好处可以起到抗高并发,削峰,业务解耦的作用。
java架构师
2019/05/15
4980
如何保障消息中间件100%消息投递成功?如何保证消息幂等性?
消息中间件系列第3讲:使用消息队列需要考虑的几个问题
放在消息队列中,消息幂等性的意思是:一条完全一样的消息,它消息一次和消费无数次的结果是一样的。
陈树义
2019/02/13
6880
消息队列 rabbitmq面试题(中间件面试题)
解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
全栈程序员站长
2022/07/30
4360
消息队列 rabbitmq面试题(中间件面试题)
如何保障消息中间件100%消息投递成功?如何保证消息幂等性?
我们小伙伴应该都听说够消息中间件MQ,如:RabbitMQ,RocketMQ,Kafka等。引入中间件的好处可以起到抗高并发,削峰,业务解耦的作用。
Java高级攻城狮
2019/05/06
1K0
如何保障消息中间件100%消息投递成功?如何保证消息幂等性?
Java消息中间件的概述与JMS规范
在很久很久以前,小明隔壁有个姓王的邻居,姑且就叫隔壁老王吧。隔壁老王有个大女儿,名叫王兰花秀丽,秀丽从小就爱听老王讲睡前故事,每晚在入睡前都要老王讲了睡前故事才能睡的得着。但某一天秀丽到了外地去上大学,老王为了能给秀丽讲故事,只能通过打电话的方式进行,如下:
端碗吹水
2022/01/12
6600
Java消息中间件的概述与JMS规范
消息中间件选型分析——从Kafka与RabbitMQ的对比来看全局
消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。
吴生
2018/04/16
2K0
消息中间件选型分析——从Kafka与RabbitMQ的对比来看全局
IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?
在IM这种讲究高并发、高消息吞吐的互联网场景下,MQ消息中间件是个很重要的基础设施,它在IM系统的服务端架构中担当消息中转、消息削峰、消息交换异步化等等角色,当然MQ消息中间件的作用远不止于此,它的价值不仅仅存在于技术上,更重要的是改变了以往同步处理消息的思路(比如进行IM消息历史存储时,传统的信息系统作法可能是收到一条消息就马上同步存入数据库,这种作法在小并发量的情况下可以很好的工作,但互联网大并发环境下就是灾难)。
用户1212940
2022/04/13
1.1K1
IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?
消息中间件基础知识-从RabbitMQ、RocketMQ、Kafka到Pulsar
本文梳理笔者的MQ知识,从消息中间件的基础知识讲起,在有了基础知识后,对市面上各主流的消息中间件进行详细的解析,包括 RabbitMQ、RocketMQ、Kafka、Pulsar,最后再横向对比这几款主流的消息中间件。
知码行者
2023/04/03
9410
分布式高可靠消息中间件-Hippo
前言 随着大数据产品的日渐丰富以及数据应用场景需求的增加,TDBank作为腾讯大数据平台的数据接入环节的位置也越发显得重要(见下图)。截止目前为止TDBank日均接入数据已经超过2W亿条每天(约600TB/天),并且数据量还在持续不断上升。Tube 作为整个数据接入体系的存储层发挥着重要作用。Tube作为一个面向高吞吐高性能的分布式消息中间件,其性能及稳定性在万亿级数据体量下经受住了考验。 但对于一些高价值、高敏感度的数据时,我们亟需一个高可靠高可用的消息中间件。(Tube在极端场景下,比如物理宕机无法恢
腾讯大数据
2018/01/26
2.2K1
IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?
在IM这种讲究高并发、高消息吞吐的互联网场景下,MQ消息中间件是个很重要的基础设施,它在IM系统的服务端架构中担当消息中转、消息削峰、消息交换异步化等等角色,当然MQ消息中间件的作用远不止于此,它的价值不仅仅存在于技术上,更重要的是改变了以往同步处理消息的思路(比如进行IM消息历史存储时,传统的信息系统作法可能是收到一条消息就马上同步存入数据库,这种作法在小并发量的情况下可以很好的工作,但互联网大并发环境下就是灾难)。
JackJiang
2018/08/29
2.1K0
30分钟带你了解「消息中间件」Kafka、RocketMQ
https://www.open-open.com/lib/view/open1421150566328.html
Yano_nankai
2021/02/01
5470
30分钟带你了解「消息中间件」Kafka、RocketMQ
【系统架构】消息中间件的架构和原理
消息中间件的作用就是用来异步化并发能力的一个载体,不仅如此,它仍然需要在架构上保证很多能力,高可用,高并发,可扩展,可靠性,完整性,保证顺序等,光是这些都已经让各种设计者比较头疼了; 更有一些变态的需求,例如慢消费,不可重复等需要花的设计代价是相当高的,所以不要盲目的迷信开源大牛,对于很多机制,几乎都要重建;建立一个符合所有业务,好用,通用的私有云,没那么简单。
一个会写诗的程序员
2020/04/16
6250
软件架构-rocketmq之初识消息中间件
1.Provider提供方:服务提供者。2.Producer生产者:创建和发送JMS消息的客户端。3.Consumer消费者:接收JMS消息的客户端。4.Client客户端:生产或消费消息的应用&进程。5.Message消息:服务端与客户端之间的传输数据对象。6.Queue队列 :包含待读取消息的准备区域(点对点)。7.Topic主题:发布消息的分布机制(发布&订阅)。
IT架构圈
2021/10/11
6310
你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你
昨天我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练(秒杀系统每秒上万次下单请求,我们该怎么去设计),知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合。
架构师修炼
2020/07/20
6.9K0
你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你
IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?
1、前言 在IM这种讲究高并发、高消息吞吐的互联网场景下,MQ消息中间件是个很重要的基础设施,它在IM系统的服务端架构中担当消息中转、消息削峰、消息交换异步化等等角色,当然MQ消息中间件的作用远不止
用户1263954
2018/06/22
1.8K0
高并发场景下,如何保证生产者投递到消息中间件的消息不丢失?
如果投递出去的消息在网络传输过程中丢失,或者在RabbitMQ的内存中还没写入磁盘的时候宕机,都会导致生产端投递到MQ的数据丢失。
java架构师
2019/06/11
9540
高并发场景下,如何保证生产者投递到消息中间件的消息不丢失?
推荐阅读
相关推荐
【百面成神】消息中间件基础7问,你能撑到第几问
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文