引言:
要确保Kafka在使用过程中的稳定性,需要从kafka在业务中的使用周期进行依次保障。主要可以分为:事先预防(通过规范的使用、开发,预防问题产生)、运行时监控(保障集群稳定,出问题能及时发现)、故障时解决(有完整的应急预案)这三阶段。
此外,如果需要对于Kafka的一些基础知识进行补充、学习,可以参考这篇文章:https://cloud.tencent.com/developer/article/2026555
事先预防即通过规范的使用、开发,预防问题产生。主要包含集群/生产端/消费端的一些最佳实践、上线前测试以及一些针对紧急情况(如消息积压等)的临时开关功能。 Kafka调优原则:
根据具体场景(是否允许一定延迟、实时消息、定时周期任务等)区分kafka topic,避免挤占或阻塞实时业务消息的处理。
如果下游消息消费存在瓶颈或者集群负载过高等,需要在生产端(或消息网关)实施流量生产速率的控制或者延时/暂定消息发送等策略,避免短时间内发送大量消息。
手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。
如果需要在保证Kafka在分区内严格有序的话(即需要保证两个消息是有严格的先后顺序),需要设置key,让某类消息根据指定规则路由到同一个topic的同一个分区中(能解决大部分消费顺序的问题)。 但是,需要避免分区内消息倾斜的问题(例如,按照店铺Id进行路由,容易导致消息不均衡的问题)。
消息消费的幂等主要根据业务逻辑做调整。
以处理订单消息为例:
简而言之,即通过Redis做前置处理 + DB唯一索引做最终保证来实现幂等性。
在消息量非常大的情况下,实时和离线消费者同时消费一个集群,离线数据繁重的磁盘 IO 操作会直接影响实时业务的实时性和集群的稳定性。
根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。
注:批量拉取处理时,需注意下kafka版本,spring-kafka 2.2.11.RELEASE版本以下,如果配置kafka.batchListener=true,但是将消息接收的元素设置为单个元素(非批量List),可能会导致kafka在拉取一批消息后,仅仅消费了头部的第一个消息。
A. 触发条件:
B. 如何避免非必要rebalance(消费者下线、消费者主动退出消费组导致的reblance):
一般情况下,还是client 消费 broker 丢消息的场景比较多,想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。
Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了,则此时消息就丢失了。
针对乱序消息:
例如:订单和支付分别封装了各自的消息,但是消费端的业务场景需要按订单消息->支付消息的顺序依次消费消息。
针对顺序消息:
两者都是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。
A. Consumer单线程顺序消费
生产者在发送消息时,已保证消息在分区内有序,一个分区对应了一个消费者,保证了消息消费的顺序性。
B. Consumer多线程顺序消费(具体策略在后面章节)
单线程顺序消费的扩展能力很差。为了提升消费者的处理速度,除了横向扩展分区数,增加消费者外,还可以使用多线程顺序消费。
将接收到的kafka数据进行hash取模(注意:如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。
此外,这里通过配置中心进行开关、动态扩容/缩容线程池。
通过事务消息,可以很好的保证一些业务场景的事务逻辑,不会因为网络不可用等原因出现系统之间状态不一致。
当更新任何一个服务出现故障时就抛出异常,事务消息不会被提交或回滚,消息服务器会回调发送端的事务查询接口,确定事务状态,发送端程序可以根据消息的内容对未做完的任务重新执行,然后告诉消息服务器该事务的状态。
设置多个分区在一定程度上是可以提高消费者消费的并发度,但是分区数量过多时可能会带来:句柄开销过大、生产端占用内存过大、可能增加端到端的延迟、影响系统可用性、故障恢复时间较长等问题。
根据吞吐量的要求设置 partition 数:
调优目标:高吞吐量、低延时。
自上而下分为应用程序层、框架层、JVM层和操作系统层,层级越靠上,调优的效果越明显。
调优类型 | 建议 |
---|---|
操作系统 | 挂载文件系统时禁掉atime更新;选择ext4或XFS文件系统;swap空间的设置;页缓存大小 |
JVM(堆设置和GC收集器) | 将JVM 堆大小设置成 6~8GB;建议使用 G1 收集器,方便省事,比 CMS 收集器的优化难度小 |
Broker端 | 保持服务器端和客户端版本一致 |
应用层 | 要频繁地创建Producer和Consumer对象实例;用完及时关闭;合理利用多线程来改善性能 |
参数列表 | |
---|---|
Broker端 | 适当增加num.replica.fetchers参数值,但不超过CPU核数 |
调优GC参数以避免经常性的Full GC | |
Producer端 | 适当增加batch.size参数值,比如从默认的16KB增加到512KB或1MB |
适当增加linger.ms参数值,比如10~100 | |
设置compression.type=lz4或zstd | |
设置acks=0或1 | |
设置retries=0 | |
如果多线程共享同一个Producer实例,则增加buffer.memory参数值 | |
Consumer端 | 采用多Consumer进程或线程同时消费数据 |
增加fetch.min.bytes参数值,比如设置成1KB或更大 |
参数列表 | |
---|---|
Broker端 | 适当设置num.replica.fetchers值 |
Producer端 | 设置linger.ms=0 |
不启用压缩,即设置compression.type=none | |
设置ackes=1 | |
Consumer端 | 设置fetch.min.bytes=1 |
kafka的稳定性测试主要在业务上线前针对Kafka实例/集群健康性、高可用性的测试。
(1) 检查实例:查看Kafka 实例对象中拿到所有的信息(例如 IP、端口等);
(2) 测试可用性:访问生产者和消费者,测试连接。
步骤:
预期:对生产者和消费者的可用性均无影响。
步骤:
预期:所有broker ready后服务正常。
运行时监控主要包含集群稳定性配置与Kafka监控的最佳实践,旨在及时发现Kafka在运行时产生的相关问题与异常。
合理进行kafka实例配,主要关注这几个数据:
可根据实际业务的消息内容大小、发送消息qps等进行预估,可以尽量设置大点;具体数值可根据实例监控查看,如果短时间内磁盘使用百分比就达到较高值,则需扩容。
峰值带宽=最大生产流量*副本数
消息即使被消费,也会持久化到磁盘存储保留时长的时间。该设置会占用磁盘空间,如果每天消息量很大的话,可适当缩短保留时间。
推荐开启动态保留设置。当磁盘容量达到阈值,则删除最早的消息,最多删除到保底时长范围外的消息(淘汰策略),可以很大程度避免磁盘被打满的情况。
但有调整时不会主动通知,但我们可以通过配置告警感知磁盘容量的变化。
如果不同业务线的 topic 会共享一块磁盘,若某个consumer 出现问题而导致消费产生 lag,进而导致频繁读盘,会影响在同一块磁盘的其他业务线 TP 的写入。
解决:Broker级别物理隔离:创建Topic、迁移Topic、宕机恢复流程
Kafka RPC 队列缺少隔离,一旦某个 topic 处理慢,会导致所有请求 hang 住。
解决:需要按照控制流、数据流分离,且数据流要能够按照 topic 做隔离。
如果一个 topic 出现问题,则只会阻塞其中的一个 RPC 处理线程池,以及 call 队列,可以保障其他的处理链路是畅通的。
整个限速逻辑实现在 RPC 工作线程处理的末端,一旦 RPC 处理完毕,则通过限速控制模块进行限速检测。
监控 | 功能/指标 | 详情 |
---|---|---|
黑盒监控 | 操作 | 主题操作:创建、预览、查看、更新、删除 |
服务 | 数据写入、是否消费成功 | |
系统 | CPU 负载、堆栈信息、连接数等 | |
白盒监控 | 容量 | 总存储空间、已用存储空间、最大分区使用、集群资源、分区数量、主题数量; |
流量 | 消息写入、消费速率、集群网络进出; | |
延迟 | 消息写入、消费耗时(平均值、99分位、最大耗时)、主题消费延迟量(offset lag) | |
错误 | 集群异常节点数量、消息写入拒绝量、消息消费失败量、依赖zookeeper的相关错误 |
针对CKafka,需要配置告警(此类告警一般为消息积压、可用性、集群/机器健康性等检查)。
如:实例健康状态、节点数量、健康节点数量、问题分区数、生产消息数、消费请求数、jvm内存利用率、平均生产响应时间、分区消费偏移量等。
具体指标可以参考:https://cloud.tencent.com/document/product/597/54514
配置文档:https://cloud.tencent.com/document/product/597/57244
选择监控实例,配置告警内容和阈值。
一般会对当前服务自身的kafka集群做告警配置,但是如果是依赖自身消息的下游服务出现消费问题,我们是感知不到了;而且针对消费端服务不共用同一个集群的情况,出现消息重复发送的问题,服务自身是很难发现的。
在业务上线前,最好梳理下自身服务所涉及的topic消息(上游生产端和下游消费端),并细化告警配置,如果出现上游kafka异常或者下游kafka消息堆积可以及时感知。特别需要把可能有瞬时大量消息的场景(如批量数据导入、定时全量数据同步等)做一定的告警或者预案,避免服务不可用或者影响正常业务消息。
通过自建告警平台配置对服务自身的异常告警,其中包括对框架在使用kafka组件时抛出与kafka消费逻辑过程中抛出的业务异常。
其中,可能需要异常升级的情况(由于)单独做下处理(针对spring kafka):
目前业界并没有公认的解决方案,各家都有各自的监控之道。
其中,Kafka Monitor通过模拟客户端行为,生产和消费数据并采集消息的延迟、错误率和重复率等性能和可用性指标,可以很好地发现下游的消息消费情况进而可以动态地调整消息的发送。(使用过程中需注意对样本覆盖率、功能覆盖率、流量、数据隔离、时延的控制)
Kakfa Monitor 优势:
基于Kafka Monitor的设计思想,可以针对业务特点引入对消息的延迟、错误率和重复率等性能的监控告警指标。
防微杜渐,遇到问题/故障时有完整的应急预案,以快速定位并解决问题。
问题描述:消费端产生消息积压,导致依赖该消息的服务不能及时感知业务变化,导致一些业务逻辑、数据处理出现延迟,容易产生业务阻塞和数据一致性问题。
方案:问题排查、扩容升配策略、消息Topic转换策略、可配置多线程的消费策略。
遇到消息积压时,具体可以从以下几个角度去定位问题原因:
对于第1、2点导致的消息积压:为暂时性的消息积压,通过扩分区、扩容升配、多线程消费、批量消费等方式提高消费速度能在一定程度上解决这类问题。
对于第3点导致的消息积压:可以采用消息Topic中转策略。
简而言之,即线程池消费+动态线程池配置策略:将接收到的kafka数据进行hash取模(如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。
设计思路:
另外,可以根据业务流量调整的线程配置与pod的配置,如高峰期设置一个相对较高的并发级别数用来快速处理消息,平峰期设置一个较小的并发级别数来让出系统资源。这里,可以参考美团提供的一种配置中心修改配置动态设置线程池参数的思路,实现动态的扩容或者缩容。
实现了动态扩容与缩容:
此外,还可以新增开关,它设置为true是可以中断启动中的线程池,故障时进行功能开关。
注意: 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。
当消息积压是发生在所有的partition还是所有的partition都有积压情况时,只能操作临时扩容,以更快的速度去消费数据了。
设计思路:
改进:
注意:
问题描述:某个消息消费异常或者某个操作较为耗时,导致单个pod的消费能力下降,甚至产生阻塞。
方案:设置偏移量;开关多线程的消费策略;
1. 调整偏移量:联系运维,将offset后移一位; 2. 消息补推:针对跳过的消息或某个时间段内的数据进行消息补推; 3. 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。
参考上面的“可配置多线程的消费策略”,在发生阻塞时开启多线程消费开关。
注:需要修改代码或者在事前将多线程逻辑写好
问题描述:服务没有按照预期消费到kafka消息,导致业务产生问题
方案:根因分析;消息补推;
(1) 生产端是否成功发送消费(源头丢失)
解决:需要检查生产端与集群健康性;消息补发。
(2) 是否被成功消费
Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了。
此外,如果消费逻辑有bug,也导致消息丢失的假象。
解决:修复问题,视情况修改消费确认机制。
(3) 是否有其他服务共用了同一个消费组
多服务误用同一个消费组会导致消息一定比率或规律性丢失。
例如,创建用户的kafka消息,可能价格中心和促销服务误用了一个消费组,导致每个服务都是消费了部分消息,导致一些问题出现偶现的情况。
解决:修改配置,重启服务,各种建立的消费组;事前需要有检查是否有多个服务共用一个消费的情况(检测+比对);
针对每个对外发送的服务,生产端一般都需要有较为完善的消息补推接口,并且消费端也需要保障消息消费的幂等)
机器、存储和网络
需要重新评估你的实例类型决策:你的集群是否饱和?在什么情况下饱和?是否存在其他实例类型,可能比你第一次创建集群时选择的类型更合适?EBS 优化实例与 GP2/3 或 IO2 驱动器的混合是否真的比 i3 或 i3en 机器(及其带来的优势)有更好的性价比?
压缩在 Kafka 中并不新鲜,大多数用户已经知道了自己可以在 GZIP、Snappy 和 LZ4 之间做出选择。但自从KIP-110被合并进 Kafka,并添加了用于 Zstandard 压缩的压缩器后,它已实现了显著的性能改进,并且是降低网络成本的完美方式。
以生产者端略高的 CPU 使用率为代价,你将获得更高的压缩率并在线上“挤进”更多信息。
Amplitude在他们的帖子中介绍,在切换到 Zstandard 后,他们的带宽使用量减少了三分之二,仅在处理管道上就可以节省每月数万美元的数据传输成本。
不平衡的集群可能会损害集群性能,导致某些 borker 比其他 broker 的负载更大,让响应延迟更高,并且在某些情况下会导致这些 broker 的资源饱和,从而导致不必要的扩容,进而会影响集群成本。
此外,不平衡集群还面临一个风险:在一个 broker 出故障后出现更高的 MTTR(例如当该 broker 不必要地持有更多分区时),以及更高的数据丢失风险(想象一个复制因子为 2 的主题,其中一个节点由于启动时要加载的 segment 过多,于是难以启动)。
定义:
所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。通俗点来讲就是,在消费者收到重复消息进行重复处理时,也要保证最终结果的一致性。
比如,银行转账、下单等,不管重试多少次,都要保证最终结果一定是一致的。
将数据库中的多个字段联合,创建一个唯一约束,即使多次操作也能保证表里至多存在一条记录(如创建订单、创建账单、创建流水等)。
此外,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统(如Redis的SETNX)都可以用于实现幂等消费。
其中,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性。
[1]https://iwiki.woa.com/pages/viewpage.action?pageId=1126809993 [2]https://www.infoq.cn/article/ucSru1uKkSswLXPcjQgC?source=app_share [3]https://blog.csdn.net/qq_32179907/article/details/122599769 [4]https://blog.csdn.net/qq_32179907/article/details/122599769 [5]https://zhuanlan.zhihu.com/p/513559802?utm_source=wechat_session&utm_medium=social&utm_oi=689250073002930176&utm_campaign=shareopn [6]https://blog.csdn.net/philip502/article/details/118997899?utm_medium=distribute.wap_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-118997899-blog-125192952.wap_relevant_multi_platform_whitelistv1&spm=1001.2101.3001.4242.1&utm_relevant_index=1 [7]https://www.zhihu.com/question/483747691/answer/2392949203?utm_source=wechat_session&utm_medium=social&utm_oi=689250073002930176&utm_content=group3_Answer&utm_campaign=shareopn [8]https://zhuanlan.zhihu.com/p/354772550?utm_source=wechat_session&utm_medium=social&utm_oi=689250073002930176&utm_campaign=shareopn [9]https://www.infoq.cn/article/contrast-with-kafka-and-jingdong-jmq?source=app_share [10]https://www.infoq.cn/article/BF3mm9haDs-cdHCXOLlf?source=app_share [11]https://www.infoq.cn/article/wmM8WXzLEgfGMKYpbF0N?source=app_share
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。