本篇文章主要讲解消费者和Partition的关系以及消费的一些相关问题。
通过前几篇文章我们知道在Partition中,消息是不会删除的,所以才可以追加写入,写入的消息是连续并且有序的。
这种特性决定了kafka可以消费历史消息,而且按照消息的顺序消费指定消息,而不是只能消费队头的消息。
正常情况下我们希望消费没有被消费过的数据,而且是从最先发送(序号最小的)的开始消费(这样才是有序和公平的)。
对于一个Partition,消费者组(Consumer Gup)怎么才能做到接着上次消费的位置(offset)继续消费呢?
肯定需要将这个对应关系保存起来,下次消费的时候查找一下。(还有一种方式是根据时间戳消费)
首先对应关系确实是可以查看到的。比如消费者组:test-group-1 和 test-topic(5个分区)的partition的偏移量关系,可以使用如下命令查看
./kafka-consumer-groups.sh --bootstrap-server localhost --describe --group test-group-1
PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID |
---|---|---|---|---|
0 | 5 | 5 | 0 | consumer-1 |
1 | 5 | 5 | 0 | consumer-1 |
2 | 5 | 5 | 0 | consumer-1 |
3 | 5 | 5 | 0 | consumer-2 |
4 | 5 | 5 | 0 | consumer-2 |
CURRENT-OFFSET:指的是下一个未使用的offset。
LEO(LOG-END-OFFSET):下一条等待写入的消息的offset(最新的offset + 1)
LAG:延迟量
注意:消费者与topic的关系是一个consumer group 和 topic 中的一个partition的关系,不是一个消费者和一个topic的关系。
offset的对应关系到底是保存在哪里的呢?
首先可以排除不会在消费者本地的,因为所有消费者都可以使用这个consumer group id,放在本地是做不到统一维护的,肯定要放到服务端。
kafka早期的版本把消费者组和partition的offset直接维护在ZK中,但是读写的性能消耗太大了。后来就放在一个特殊的topic中,名字叫_consumer_offsets,默认有50个分区(offset.tipic.num.partitions=50),每个分区默认一个replication。
./kafka-topics.sh --topic __connsumer_offsets --describe --zookeeper localhost:2181
看起来这些分区副本在3个Broker上非常均匀和轮流地分布。
这样一个特殊的Topic怎么存储消费者组test-group-1对于分区的偏移量呢?
Topic里面是可以存放对象类型的value的(经过序列化和反序列化)。这个Topic里面主要存储的两种对象:
GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者都有编号)。
OffsetAndMetadata:保存了消费者组和各个partition的offset位移信息元数据。
./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
以上都是Broker有记录的offset的情况,如果说增加了一个新的消费者组去消费一个topic的某个partition,没有offset记录,这个时候应该从哪里开始消费呢?
什么情况下找不到offset?
就是你从来没有消费过,没有把当前的offset上报给Broker。
消费者代码中有一个参数,用来控制如果找不到偏移量的时候从哪里开始消费。
auto.offset.rest=latest
参数 | 说明 |
---|---|
latest(默认) | 从最新的消息开始消费(最后发送的)。历史消息是不能消费的 |
earliest | 从最早的消息开始消费(最先发送的)。可以消费到历史消息 |
none | consumer group 在服务端找不到offset会报错。 |
anything else | 向消费者抛出异常。 |
上边讲了消费者组的offset是保存在Broker的,但是,是由消费者上报给Broker的。并不是消费者组消费了消息,offset就会更新,消费者必须要有一个commit的动作。就跟RabbitMQ中消费者的ACK一样。
同样的,消费者可以自动提交或手动提交。由消费端的这个参数控制:
ennable.auto.commit=true
默认是true。true代表消费者消费消息以后自动提交此时的Broker会更新消费者组的offset。
还有一个参数可以控制自动提交频率:
auto.commit.interval.ms=5
如果我们要在消费完消息做完业务逻辑处理之后才commit,就要把这个值改成false。如果是false,消费者就必须要调用一个方法让Broker更新offset。
有两种方式:
consumer.commitSync() 的手动同步提交。
consumer.commitAsync()手动异步提交。
如果不提交或者提交失败,Broker的offset不会更新,消费者组下次消费的时候会消费到重复的消息。
多个consumer group和partition的关系?
重复消费。任何一个消费者组,都会把一个topic的所有partition分配完。
上边我们说过,一个消费者组里面的一个消费者,只能消费Topic的一个分区。
如果分区数量跟消费者数量一样,那就一人消费一个。如果是消费者比分区多,或者消费者比分区少,这时消费者跟分区的关系是怎样的呢?
如果消费者比分区多,肯定有一些消费者消费不到(空闲)。
如2个消费者消费5个分区,如果分配呢?
创建一个5个分区的topic。
./kafka-topic.sh --create --zookeeper localhost:2181 --partition 5 --replication-factor 1 --topic test-5-part
实际上是采用了默认策略:RangeAssignor
分策略图:
StickyAssignor:这种策略复杂一点,但相对来说均匀一点(每次的结果都可能不一样)原则:
有两种情况需要重新分配分区和消费者的关系:
为了让分区分配尽量地均匀,这个时候会触发rebalance机制。
分区重新分配可以分成以下几步: