本文我们一起来学习消费者组重平衡相关的知识。
先解决前面留下的问题:如果生产者已经发送了大量消息,但在最后提交之前突然宕机,事务协调器会如何处理这个未完成的事务呢?
答案是自动终止,事务协调器如果在一段时间内没有收到生产者的任何消息或者提交事务的请求,会利用 __transaction_state 中记录的信息向所有涉及到的分区发送 Abort 命令,写入的消息也会被标记为废弃。这个时长是由 transaction.timeout.ms 参数控制的,默认是 1 分钟(值为 60000)。
回答完遗留的问题,我们进入今天的正题,第一个问题是什么是消费者组(Consumer Group)?
这是 Kafka 中比较有亮点的设计。简单来说,Consumer Group 就是 Kafka 提供的可扩展且具有容错性的消费机制。消费者组内部包含多个消费者实例,它们共享一个 Group ID。组内的所有消费者实例共同订阅一个或多个 Topic 的所有分区。每个分区只能由同一个消费者组的一个消费者实例来消费。
消费者组有以下特性:
聊完消费者组的定义和特性之后,我们再来看一下它最核心,也是最让人头疼的机制——重平衡。重平衡本质上是一种自动调度的机制,它确保 Topic 的所有分区都有对应的消费者来消费,并且分配的相对均衡。
触发重平衡的条件有三个:
Kafka 为消费者组定义了 9 种状态,它们的含义如下:
状态 | 含义 |
|---|---|
UNKNOWN | 未知状态,通常是客户端与 Broker 的版本不兼容,或者发生了无法识别的异常。 |
PREPARING_REBALANCE | 重平衡准备中,协调器收到了加入申请或者心跳超时,准备重新分配。 |
COMPLETING_REBALANCE | 重平衡同步中,所有成员都已经加入组,Leader 正在计算协调方案。 |
STABLE | 稳定状态,重平衡完成。 |
DEAD | 消费者组注销,元信息在协调者端已被移除。 |
EMPTY | 组内没有任何成员,通常是刚创建或者所有的消费者都正常关闭,元信息依然保留在协调者端。 |
ASSIGNING | 分配中,这是 2.4+ 版本新引入的,Leader 计算好了一部分新的分配计划,正准备下发。 |
RECONCILING | 协调中/一致性同步中,也是 2.4+ 版本新引入的,成员正在释放不再属于自己的分区,准备接手新的分区。 |
NOT_READY | 未就绪,通常是组刚刚启动,或者协调者端在进行迁移。 |
状态流转流程如下图

KafkaGroupState
接着我们来看一下最经典的重平衡过程。
这就是一次完整的重平衡流程,这里有一个问题是:在这整个过程中,消费者都是不处理消息的,也就是我们常说 Stop-The-World 问题。如果你有几百个 Consumer 实例,那么一次 Rebalance 可能需要几个小时,这简直令人崩溃。
基于这种问题,Kafka 在 2.4 版本推出了增量协作重平衡机制。这种机制下,重平衡的过程不再要求所有的消费者都放下手中的工作,而是只处理那些需要变动的分区,这样就极大的提升了稳定性。只需要在大于 2.4 版本的客户端中将 partition.assignment.strategy 设置为 CooperativeStickyAssignor 即可。
新版本重平衡流程如下:
Kafka 通过多次小的调整,来避免整个集群长时间停止工作,以此来减少重平衡对于整体集群的影响。这一进化是不是有点像 JVM 的 GC 从传统垃圾回收器进化到 G1 和 ZGC。
虽然新版本的重平衡机制有了很大的进步,但还是会对系统性能造成一定的影响。那如何才能避免重平衡呢?
首先,完全消除重平衡是不可能的。我们要做的就是消除掉非预期的重平衡。什么是非预期的呢?你可以理解为是由于配置不当或者系统抖动引起的重平衡。
我们分别从参数调优、代码健壮性和架构设计三个层面来看一下如何调整。
首先是参数调优,非预期重平衡触发最常见的两个原因,一个是心跳超时,另一个是逻辑处理超时。
为了避免因为网络抖动导致误判心跳超时,我们可以适当调大 session.timeout.ms,这个参数决定了 Consumer 存活性的时间间隔,除了这个参数,还需要调整 heartbeat.interval.ms,这个是用来控制发送心跳消息的频率的。发送的越频繁,协调者越能更快响应 Consumer 掉线并开启重平衡,但随之而来的问题是消耗的资源也越多。通常可以把它设置为 session.timeout.ms 的三分之一。
逻辑处理超时的参数主要是 max.poll.interval.ms,它用来控制两次 poll 之间的间隔,如果你的业务逻辑复杂,需要处理时间比较长,那么就需要调大这个参数。例如你在业务代码中访问了第三方存储,整个过程需要 5 分钟,那么这个参数可以设置为 6 分钟。除了调大 max.poll.interval.ms 之外,我们也可以调整 max.poll.records,它是用来控制每次 poll 的消息条数,通过减少消息条数,能够缩短 poll 一次的逻辑处理时间。
介绍完了参数调优之后,我们再来看一下代码层面有哪些需要调整或者注意的地方。首先是最基本的 try-catch 防护,我们应该确保所有的异常都在消费逻辑中处理掉,一旦消费者因为没捕获异常而崩溃,那么必然会触发重平衡。
其次就是优雅关闭 Consumer,在停止时手动调用 consumer.close(),这样会给协调者发送 LeaveGroup 请求,协调者收到请求后可以立即开启重平衡,缩短“空窗期”。
在架构层面,除了使用我们前面提到的增量协作重平衡协议之外,还可以设置 group.instance.id,这是为每个消费者实例设置一个固定的 ID,这样在实例重启时,只要在 session.timeout.ms 时间内回来,协调者都会认出它,不会触发重平衡。
本文我们先了解了什么是消费者组,一句话概括就是它是 Kafka 提供的可扩展且具有容错性的消费机制。接着又聊了重平衡机制,包括消费者组的状态以及重平衡的整个流程。最后我们介绍了如何避免非预期的重平衡,这能帮助我们提升 Kafka 集群的稳定性。