00:00
你好,我是杨思正。在开始分析卡夫卡consumer的具体实现之前,我们先来简单介绍一下卡夫卡consumer涉及到的一些基础理论。在第一课时介绍consumer group时提到,对于同一个consumer group来说,同一个topic不同会分配给不同的consumer进行消费,那如何分配这些呢?如果在有新的consumer加入或者是consumer宕机的时候,如何重新分配这些呢?这就是我们这节课要讲的consumer group reb协议。在卡夫卡最初的解决方案中,Consumer group rebance协议是依赖足keepper的water实现的。在这个方案里面。每个consumer group在keep中都维护了一个对应的路径,就是这个路径。这个路径下面使用临时节点记录了该consumer group中的所有的consumer ID。这个consumer ID是一个临时节点,它会在consumer启动的时候进行创建。另外,卡夫卡还会初始化owners和off这两个节点,这两个节点与ID节点同级。
01:08
我们来看下面这张图,在consumer group ID下面有三个节点,分别是ID owners和offs这三个节点。他们三个是同级的,其中owners这个节点记录了consumer和part之间的分配关系,这个节点用来记录对应consumer group在这个partition上的消费位置。除了consumer group相关的ZK节点之外呢,卡普卡还会在ZK上维护一些原数据,这些原数据呢,包括topics原数据。啊,比如说topics叫什么名字啊,它有多少个partition,以及partition的状态,还维护了brokeer的一些原信息,比如说这个broke的host port这些信息,每个consumer都会在这个节点。以及这个节点下面注册一个water。通过这个节点下面的water,我们就可以监听customer group中消费者的变化,通过这个节点上的water就可监听到卡夫卡集群中broke的增减,通过这两个consumer就可以监控到customer group状态以及卡普卡集群的状态了。原理上这个方案是可行的,但是这个方案呢,重度的依赖了ZK。
02:17
就会导致一些问题。当consumer很多的时候,一个consumer或者一个broke发生了变化,你比如说一个consumer加入或者退出了consumer group,或者是一个broker进行重启,就会导致全部的consumer接收到这个water通知,这个时候很多consumer并不关心这个事件,这就导致很多无用的water通知。这就会给ZK集群带来相当大的压力,也给consumer带来一些困扰。第二个问题就是每个consumer都依赖ZK集群来判断consumer group的状态以及broke的状态,但是不同的consumer在同一时刻可能连接到了ZK的不同的节点上。由于ZK只是保持最终一致性,它不是一个强一致性的集群,那么不同的consumer看到的原数据就有可能不一致,这就可能会造成多余的reb操作。
03:12
为了解决上述的这些问题,卡夫卡在后续的版本中对rebe方案进行了修改,这个改进之后的方案呢,就是我们下面要来介绍的一个re balance protocol改进方的核心思想就是将全部的consumer group分成多个子集,每个consumer group呢?在broke上选一个对应的group coordinator,由group coordinator管理对应的consumer group。主要就是用来管理consumer group的reb。每个broker都拥有成为group coordinator的能力。KA普卡通过添加一层group coordinator的方式,让所有的consumer不再直接依赖于做keepper,而是靠group coordinator在ZK上添加water。下面我们就来简单看一下这个1r reb protocol的reb流程。首先呢,当consumer准备加入consumer group或者是group coordinator发生故障转移的时候,也就是说这个时候consumer并不知道。
04:12
Group coordinator的host和port,这个时候consumer就会向卡普卡集群的任意一个broke发送find coordinator request请求。收到这个请求之后呢,ER就会返回response响应,其中就包含了负责管理。该consumer group的group continator的地址。接下来,Consumer会连接到这个group continator,并周期性的发送心跳。grouptin会通过心跳消息确定consumer是否正常在线。长时间收不到心跳消息的时候,Groupin就会认为consumer已经宕机了,这个时候他就会为consumer group触发新一轮的re balance操作。在consumer收到但有异常的心跳响应时,就表明group coordinator发起了reb操作。
05:01
此时,Consumer就会向groupinator。发送一个John group request,向groupinator表明自己要加入这个consumer group。在group cin发起re balance操作之后,它会等待一个时间窗口,主要目的是让consumer group中全部的consumer都完成join group request请求的发送。然后group coinator会结合ZK中的partition源数据。进行partition分配。在group codator分配完partition之后,会将partition与consumer之间的映射关系写入到JK集群中进行保存,同时还会将分配的结果通过joinone group response返回给consumer consumer会根据one group response响应中的分配结果消费对应的partition。同时还会定期的继续发送心跳请求,表明自己是在线的。如果后续再次出现consumer的上下线的上下线,或者是partition的增加等等这些情况,Group coordinator依然会通过带有异常的心跳响应来触发re balance操作,这个时候会跳到第三步。
06:09
继续进行rebell操作。上面这个方案看起来已经比较完美了,但是还是有一些问题,比如re balance策略是写在group coordinator中的,扩展性上多多少少有些问题。当我们需要使用新的re balance策略的时候,就需要改动group。这个时候我们就要重启或者是重新部署pro。为了解决上面的问题呢?卡不卡在0.9版本重新设计了一个re balance protocol,将partition分配的工作放到了consumer这一端进行处理。Group的管理工作依然是交给groupt进行处理。在这个版本的re balance协议中,将group request拆成了两个阶段,分别是John group阶段和schizing group阶段。我们来看一下升级之后的re协议。
07:00
第一步,当consumer通过findtinator request查找到其consumer group对应的groupinator之后。会进入join group阶段。下面我们看到的就是find coordinator request请求和find coordinator response响应的具体格式。我们可以看到,在find con response中,包含了node节点的ID host和port。在consumer收到find clinical response之后。Consumer会先向group发送group request请求,其中包含了consumer的相关信息。下面这张图就展示了卓文group阶段发送的请求。我们可以看到,在这个阶段中,这三个consumer都会向coordinator发送join group request请求,下面就是join group request请求的格式,其中携带了group ID。
08:00
过期时间和rebance的过期时间。还包括了。Member ID member ID就是coinator给这个consumer分配的一个ID,下面还有pro。这个集合记录了这个consumer支持的pole协议类型。在grouptinator收到join group request请求之后,会暂存consumer的信息。然后依然是等待一个窗口期,等待全部的consumer发送完join group request请求。接下来,Groupator会根据consumer的join group request请求来确定consumer group中可用的consumer,然后从这些consumer中选举一个consumer成为group leader。同时还会决定partition分配的具体策略,最后将这些信息封装成join group response,返回给前面选举出来的这个group leader。下面就是group response的具体格式,每个consumer都会收到group response响应,但是只有group leader收到的group response响应中封装了全部的consumer信息以及group leader的信息。
09:14
当consumer group中的一个consumer确定了自己成为leader之后,会根据consumer信息、卡夫卡集群的原数据以及partition分配策略进行partition分配。其他非leader的consumer在收到join group response之后不会进行任何操作。下面就展示了。Join group response响应的返回。我们可以看到,Coin为三个consumer都返回了johnone group response,其中只有CONSUMER1成为了leader。由他完成的分配,其他的CONSUMER2和CONSUMER3只是原地等待。接下来,所有的consumer进入snchron group阶段,所有的consumer会向grouptinator发送s group request请求,其中leader consumer的请求中会包含partition的分配结果。
10:09
普通的consumer。则发送的是一个空请求。下图就展示了group request请求的发送。我们可以看到,CONSUMER1作为leader会发送一个有效的seek group request请求,而CONSUMER2和CONSUMER3发送的则是一个空的group request请求。下面是group request请求的格式,在其中包含了partition的分配结果。接下来group coordinator。会将接收到的part分配结果封装成sick group response返回给所有的consumer。正如下图展示的这样,Cinator返回group response响应。这个时候,所有的consumer就知道自己该消费哪些partition了。完成了这一轮的reb之后,Consumer还是会与inator保持心跳。
11:02
与前面的协议一样,如果consumer长时间没有发送心跳,Coinator就会触发新一轮的re操作。下图就展示了心跳请求的发送过程,很明显,这次B协议的升级将reb的行为迁移到了consumer端,由consumer leader进行处理。由consumer完成了真正的partition分配,也就解决了之前提到的扩展性问题。在前面介绍的reb pro中,都存在一个比较严重的问题,那就是长时间的stop the world,这个stop the world并不是GC时候的stopor。而是在整个reb过程中,所有的petition都会被回收掉,Consumer长时间无法消费petition。例如一个consumer退出consumer group的时候,会发送live group request请求到下面就展示了这个live group request的请求发送的过程。
12:01
比如我们的CONSUMER1离开group的时候,就会发送live group request请求到cin,这个时候cinator在收到这个live请求之后,就会在心跳响应中携带异常信息。在CONSUMER2和CONSUMER3收到这个异常的心跳之后,就会触发下一轮的reb操作。在新一轮的re balance操作开始之后,我们的CONSUMER2和CONSUMER3就不能再消费任何的了。在有新的consumer加入consumer group的时候也是一样的。在实际生产中,Consumer的上下线是非常常见的操作,这样的话,每个consumer实力的更新都会导致两次balance操作。这样的话就会影响整个consumer group的消费进程。还有另一个问题就是consumer出现了长GC,比如下图展示,这样CONSUMER1碰到了长时间的GC,这个时候长时间没有收到consumer的心跳,他就会认为CONSUMER1下线了,这个时候他就会触发re balance操作,但是没过多久,CONSUMER1从GC中恢复了,这个时候他重新加入到consumer group中,这个时候又会再进行一轮re balance操作。
13:13
为了解决上述的问题,卡夫卡在2.3版本中引入了static membership协议相关的参考,我放在了这里。Static membership优化协议的核心是在consumer增加一个group点点ID配置。如果consumer启动的时候明确指定了这个配置项,Consumer会在join group request中携带,这个值表示consumer为static member。为了保证group instance.id这个配置的唯一性,我们考虑使用IP或者host name这个信息作为instance ID。在conin端会记录instance ID和member ID之间的映射关系,以及instance ID和partition的分配关系。当ator收到已知instance ID consumer的joinone group request请求时,不会进行repell操作,而是将其原来对应的partition分配给他。
14:07
Static membership协议可以让consumer group只在下面四种情况下才进行re操作。也就是有新consumer加入consumer group的时候。Group leader重新加入group的时候,或者是consumer下线的时间超过了session timeout,或者是tinator收到了static member的live group request请求时。这样的话,在使用static membership的场景下,只要consumer重新部署的时候不发送live group request请求。且在session timeout这个时间窗口内重新上线就不会触发reb操作。在卡夫卡2.4版本中,为了进一步减少reb带来的stop the world提出了cooperative reb协议,其核心思想就是将一次全局性的re balance操作改成多次小规模的re balance。
15:06
最终收敛到reb状态。在开始介绍coitive reb协议之前,我们先来明确reb协议中回收全部position的根本原因。这个根本原因就是一个partition只能分配给一个consumer进行消费。如下图展示的这样,Eagle reb协议中,Consumer为了满足这个需求,在发送johnone group request的时候就会停止所有的消费。直到consumer收到s group response。也就是新的分配结果之后,Consumer才能继续进行消费。如果将分布式系统简化成一个多线程的应用,整个re balance过程就相当于一个内存屏障。它的作用就是用来同步所有consumer的消费状态。说完了reb的本质之后,我们开始介绍cooperive reb协议,这个协议最核心的思想。
16:04
就是让consumer比较新旧两个partition的分配结果,只停止消费需要回收的partition。也就是说,如果新旧两次partition的分配结果中,同一个partition都分配给了这个consumer,这个consumer根本就没有必要停止消费。这也就解决了stop the word问题。上图就展示了一个consumer在一次reb中的比较操作,Owned partitions和assigned partitions分别是该consumer在reb前后需要处理的partition集合。其中,Consumer在整个re过程中都无需停止对UN change的这个集合的消费。介绍完coive re balance协议的核心思想之后,我们通过一个事例来介绍这个协议的工作原理。首先我们来看一个新consumer加入consumer group的过程,如下面这张图展示的这样。
17:00
当前有CONSUMER1和CONSUMER2这两个consumer consumer1消费的是一到三。CONSUMER2消费的是POSITION4到六。此时,CONSUMER3加入了consumer group。触发了新一轮的res。CONSUMER3此时会向coinator发送johnone group request的请求,触发第一轮re。In,在收到心跳请求之后。会通知CONSUMER1和CONSUMER2参与reb操作。CONSUMER1和CONSUMER2会将当前自己正在处理的partition信息记录到join group request请求中,发往ator。此时,CONSUMER1和CONSUMER2并不会停止对的消费,经过coin处理之后,会选举CONSUMER1作为group,然后cinator会发送给CONSUMER1JOHN group response coinator发送给CONSUMER1的John group response响应中就包含了各个consumer目前处理的partition信息。
18:07
CONSUMER1在收到响应之后开始进行的分配,这个时候他就决定把PART1和PART2继续分配给自己。把partition进行回收。然后把帕四和五继续分配给卡二,然后把帕六进行回收。接下来,CONSUMER1在完成这个分配之后,会通过s group request请求将分配结果发送给group c。会将CONSUMER1产生的分配结果通过CI group response返回给三个consumer。这个时候三个consumer收到的sick group response是这个样子,CONSUMER1继续消费,一和二三被回收掉了。CONSUMER2继续消费PART4和PART5 partition6被回收掉了,而CONSUMER3不会消费任何partition。到这里,第一轮的re balancell操作就结束了。在整个第一轮balance过程中,CONSUMER1、CONSUMER2依旧在消费自己的partition,在CONSUMER1和CONSUMER2收到s group response响应的时候,才会停止对PARTITION3和PART6这两个partition的消费。
19:22
同时立即发起第二轮reb操作。Consumer在John group request请求中依然会携带自己消费的信息,这些信息传到coin之后,Coin会通过join group response将这些信息发给group。这假设还是CONSUMER1 consumer1发现TOPIC1有六个,目前只分配了四个,还有一个consumer是空的,自然就会选择CONSUMER3来消费三和六。接下来经过s group request和s group response的交互之后,三和六就成功分配给了CONSUMER3。
20:01
这个时候,CONSUMER3就可以开始消费PART3和PART6了。到此为止,第二轮的re balancell操作就结束了,整个re balancell也就完成了。在整个过程中,CONSUMER1和CONSUMER2都没有停止对P1、P2以及P4、P5的消费,也就消除了全局的stop the world。接下来我们再来看一个consumer上下线的场景。当前我们依然是有三个consumer consumer2需要进行更新重启,这个时候CONSUMER2会离开consumer group。并且离开的时间超过了session timeout。此时,Coin就会触发第一轮的reb。首先,Calling会通过心跳通知CONSUMER1和CONSUMER3发起一轮re balance consumer1和三会将自己正在处理的信息通过join group request发往c nature,也就是这个请求。C这里选择CONSUMER1作为group leader会将目前的分配状态通过整group response返回给CONSUMER1 consumer1发现PART4和PART5这两个potential并没有出现,也就是说这两个part处于lost的状态。此时consumer一并不会立即去解决当前不平衡的问题,而是返回一个不变的partition分配结果。
21:20
In会根据CONSUMER1返回的这个结果,通过group response返回给剩余两个存活的consumer。具体返回的结果如下所示。也就是说,CONSUMER1,正常消费P1P2。CONSUMER3,继续消费P3P6。这里会多了一个delay信息。这个类信息就是一个时间窗口。也就是最长等待CONSUMER2恢复的一个时间窗口。到此为止,第一轮re操作就结束了。在整个第一轮re balance过程中,CONSUMER1和CONSUMER3并不会停止partition的消费。
22:03
在delay这个时间窗口内,CONSUMER2故障恢复重启成功了,会重新加入到我们的consumer group中,此时它会向发送one group request请求,触发第二轮rebe操作。CONSUMER1和CONSUMER3在收到异常的跳之后,就会发送join group request请求,参与第二轮的re balance。当然,Join group request请求也会携带自身正在消费的信息。在第二轮的reb中,Consumer依依然被选做了group leader。他会检查这类时间是否已经到了,如果没有到的话,依旧不会立即解决不平衡的问题,而是继续返回目前使用的分配结果。返回的信息如下,依然是CONSUMER1。消费P1P2。CONSUMER3消费P3P6,但是注意这里的delay时间进行了更新。更新之后的delay时间就是窗口剩余的时间。
23:04
到此为止,第二轮的re balance操作就结束了。整个reb过程中。CONSUMER1和CONSUMER3并没有停止消费。当delay时间到期之后,Consumer会重新发送join group request请求,触发第三轮re balance操作。John group的请求依然携带了当前的分配信息。在此次re操作中,CONSUMER1依旧被选成了,他会发现delay时间已经到期,此时才会真正开始解决不平衡的问题。它会对partition进行重分配,最新的分配结果会通过group response返回给各个consumer,返回的结果依然是CONSUMER1消费P1P2 consumer3消费P3P6。而CONSUMER2。则依旧消费原来的P4、P5。到此为止,第三轮的re balance操作就结束了。整个re过程中,CONSUMER1、CONSUMER3并没有停止消费。
24:04
最后,我们再来看consumer永久宕机的场景。这里依然是三个consumer。其中CONSUMER2永久的退出了consumer group。与前面介绍的重启场景不同的是,CONSUMER2退出consumer group的时间超过了我们的delay时间。其他consumer会认为它永久退出了。这个时候直接进行上图展示的第二轮reb操作,重新分配partition。整个这个过程与前面consumer重启的过程非常类似,这里就不再重复介绍了。最后我们再深入介绍一下。Cooperative reb协议的本质。正如下面这张图展示的这样,它使用多轮reb的方式实现了synchronization barrier的效果,也就避免了一次回收全部partition的效果。从而避免了stop the world。
25:02
本课时我们重点介绍了consumer group re balance协议的演进和各个版本协议的具体原理。下一课时,我们将开始结合卡夫卡consumer的代码分析分配的具体策略。本课时相关的文档和视频还会同步更新到微信订阅号以及B站当中,我们下一课时再见。
我来说两句