以下特点实现了了kafka的消费者设计思想:基于队列和基于发布/订阅者模式的 生产-消费模型。
生产一个消息:
启动两个consumer,这两个consumer属于同一个group
启动两个consumer,这两个consumer属于不同group
这时我们明白了消费组id的背后实际意义,一般我们会设置组id为一个跟业务相关的名字。
重平衡也可以理解成一个后端的分配机制,使得负载均衡或者部分节点失活扔能达到系统可用。例如说一个消费组包含10个消费者。订阅了50个topic,那么每个消费者会平均得到5个topic。
类似于分布式的管理协议,该组会协商出一个协调者(coordinator)。该协调者负责管理组的状态。
触发rebalance的条件有:
一个例子是:
当new第二个consumer
这时候老的consumer会出现
消费者保存当前消费消息的位置。也就是下一次消费的位置。这个位移消息保存在broker服务端。服务端保存这些offset需要内存,同时需要和客户端同步位移信息。一般同步做法是引入位移信息。还引入了检查点机制(这是什么?)。
在消费者设计的原则上有如下准则:最多一次(at most once)、最少一次(at least once)、有且只有一次(exactly once)。
所以这涉及到consumer在消费之前提交位移还是,处理完消息再提交位移,因为消费者在取到消息和处理完消息之间可能发生崩溃。那么消费者重启到底是从哪个位移消费。kafka默认是at least once方案,也就是说处理完消息之后再提交位移。如果能够支持事务,那么这个设计可以提升到exactly once。
内部topic名字为__consumer_offsets用来保存消费者提供的offset。消费者的位移提交会在__consumer_offsets-<某个分片>写上一条消息。消息key是group id + topic + 分区,value是偏移量,如果一个group的一个conumer对同一个topic分区提交了多次,那么kafka会使用compact策略保存最新的一次提交位移
位移提交又分为自动提交和手动提交。
在用户订阅了topic之后,poll以事件循环开等待读取消息。可以触发的消息包括coordinator协调消息,消费组内部的reblace消息,和生产者写入topic的消息。这样达到一个线程IO管理所有事件。
在poll进行消息等待,可以设置以下策略来退出等待,处理消息。
一个实例只允许运行在一个线程方案,这是由于减少引入同步、锁机制带来的性能折损。建议使用单线程方案。在消费者poll消息进入一个循环体,我们用isRunning变量控制循环运行。如果程序执行进入到其他线程,那么主动设置isRunning=false来结束consumer。主动调用consumer.close会及时告知开启新一轮的reblance。
,sessiong.timeout.ms设置较低值,max.poll.interval.ms设置成实际处理耗时。既可以快速检测奔溃,又可以处理逻辑不会引起没必要的reblance
fetch.max.bytes:消费的最大字节数,如果过大超过配置值,则无法消费。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。