1
Kafka消费模式
从kafka消费消息,kafka客户端提供两种模式: 分区消费,分组消费。
分区消费对应的就是我们的DirectKafkaInputDStream
分组消费对应的就是我们的KafkaInputDStream
消费者数目跟分区数目的关系:
1),一个消费者可以消费一个到全部分区数据
2),分组消费,同一个分组内所有消费者消费一份完整的数据,此时一个分区数据只能被一个消费者消费,而一个消费者可以消费多个分区数据
3),同一个消费组内,消费者数目大于分区数目后,消费者会有空余=分区数-消费者数
2
分组消费再平衡策略
当一个group中,有consumer加入或者离开时,会触发partitions均衡partition.assignment.strategy,决定了partition分配给消费者的分配策略,有两种分配策略:
1,org.apache.kafka.clients.consumer.RangeAssignor
默认采用的是这种再平衡方式,这种方式分配只是针对消费者订阅的topic的单个topic所有分区再分配,Consumer Rebalance的算法如下:
1),将目标Topic下的所有Partirtion排序,存于TP
2),对某Consumer Group下所有Consumer按照名字根据字典排序,存于CG,第i个Consumer记为Ci
3),N=size(TP)/size(CG)
4),R=size(TP)%size(CG)
5),Ci获取的分区起始位置=N*i+min(i,R)
6),Ci获取的分区总数=N+(if (i+ 1 > R) 0 else 1)
2,org.apache.kafka.clients.consumer.RoundRobinAssignor
这种分配策略是针对消费者消费的所有topic的所有分区进行分配。当有新的消费者加入或者有消费者退出,就会触发rebalance。这种方式有两点要求
A),在实例化每个消费者时给每个topic指定相同的流数
B),每个消费者实例订阅的topic必须相同
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
其中,topic对应的value就是流数目。对应的kafka源码是在
在kafka.consumer.ZookeeperConsumerConnector的consume方法里,根据这个参数构建了相同数目的KafkaStream。
这种策略的具体分配步骤:
1),对所有topic的所有分区按照topic+partition转string之后的hash进行排序
2),对消费者按字典进行排序
3),然后轮训的方式将分区分配给消费者
3,举例对比
举个例子,比如有两个消费者(c0,c1),两个topic(t0,t1),每个topic有三个分区p(0-2),
那么采用RangeAssignor,结果为:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
采用RoundRobinAssignor,结果为:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
4
分组成员的存活检测
分组消费有一个比较好的功能就是自动检测失败的消费者并将其踢出分组,然后重新进行分区分配。那么kafka是如何检测失败的消费者的呢。我们就拿0.10.x为例进行讲解说明。
消费着订阅了一组的topic后,会在调用poll(long)函数的时候加入分组,分组内新增消费者就会进行再平衡。Poll 函数的设计目标就是来保证消费者存活的。只要持续不断的调用poll函数,消费者就会留在分组里,连续的从分配给他的分区里消费消息。消费者也会使用一个后台线程发送周期性的心跳给broker。如果消费者挂掉或者无法在session.timeout.ms时间范围内发送心跳,消费者会被视为死亡,它的分区就会被重新分配。session.timeout.ms默认是10000ms。该值要在group.max.session.timeout.ms=300000ms和group.min.session.timeout.ms=6000ms之间。
由于心跳是后台线程周期性发送的,那么会存在消费者心跳正常发送,但是不消费消息的情况。为了避免这种消费者无限期的占用分配给他的分区这种情况,kafka提供了一种存活检测机制,使用max.poll.interval.ms配置。根本上来说,两次调用poll函数的间隔大于该值,消费者就会离开分组,然后它的分区会被其它消费着消费。当发生这种情况时,你会收到一个offset提交失败的异常。这种机制确保了只有活跃的消费者才能提交offset。
消费者有两个配置来控制poll函数的行为:
有些情况下,数据处理时间不可预期,上面的两个参数并不难满足需求。这种情况下,推荐将消息处理放到其它后台线程中执行,这样消费者就可以持续的调用poll函数了。但是这中情况下,要处理好offset提交的问题。典型做法就是禁止掉自动提交offset,改为手动再消息处理结束后提交offset。这种情况下,需要对消费的分区调用pause函数,这样在调用poll函数的时候就不会接受新的数据,然后处理完之后调用resume(Collection)即可恢复消费。