在N久之前,曾写过kafka 生产者使用详解,
今天补上关于 offset 相关的内容。
那么本文主要涉及:
这里额外提一句就是,客户端从scala 语言转向 java, 并不是 java 比 scala 要怎么怎么样, 仅仅只是因为社区的开发者换人了
一个正常的消费逻辑需要具备以下几个步骤:
消费者可以订阅多个Topic,
consumer.subscribe(Arrays.asList("t1","t2"))),
如果订阅多次,后面的会覆盖前面的,
所以取消订阅其实也可以去订阅一个空集合。
订阅支持正则表达式:
consumer.subscribe(Pattern.compile("topic .*"));
这样订阅后,如果kafka后面新增了满足该正则的 Topic也会被该消费者消费
消费者也可以直接订阅某个分区的数据,
这里我们贴下代码,如下:
List<TopicPartition> partitions = new ArrayList<>();
// 查询kafka分区信息
List<Partitioninfo> partitioninfos = consumer.partitionsFor( topic );
if (partitioninfos != null) {
for (Partitioninfo tpinfo : partitioninfos) {
partitions.add(new TopicPartition( tpinfo.topic(), tpinfo.partition() )) ;
consumer.assign( partitions ) ;
值得注意的是:
subscribe订阅是具有分区在均衡能力的,
而 assign 是没有的
这里我们只是简单的过了一下 消费者, 因为不是本文的重点, 如果要详细了解的话, 还是去看看这篇 kafka 生产者使用详解
这里指的是消费者消费的位移,
而不是Kafka端储存的消息的 offset,
这其中的区别希望读者清楚,不要混淆了。
对于offset 的提交,
我们要清楚一点
如果我们消费到了 offset=x 的消息
那么提交的应该是 offset=x+1,
而不是 offset=x
kafka的提交方式分为两种:
在Kafka 中默认的消费位移的提交方式是自动提交,
这个由消费者客户端参数enable.auto.commit
配置,
默认值为true。
当然这个默认的自动提交不是每消费一条消息就提交一次,
而是定期提交,
这个定期的周期时间由客户端参数auto.commit.interval.ms
配置,
默认值为5 秒,
此参数生效的前提是enable.auto.commit
参数为true。
自动位移提交的动作是在poll()方法的逻辑里完成的,
在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,
如果可以,那么就会提交上一轮消费的位移。
- 批量提交 该方式的最大问题在于数据是批量处理,
当部分数据完成消费,
还没来得及提交offset就被中断,
则会使得下次消费会重复消费那部分已经消费过的数据。
final int minBatchSize = 200;
List<ConsumerRecord> buffer= new ArrayList<>() ;
while ( isRunning.get() ) {
ConsumerRecords<String , String> records = consumer . poll(1000) ;
for (ConsumerRecord<String , String> record : records) {
buffer.add(record);
if (buffer.size() >= minBatchSize) {
//do some logical processing with buffer .
consumer.commitSync() ;
buffer.clear();
}
}
- 单条消息提交一次 该方式每消费一次,就保存一次。
虽然在很大程度上避免了重复消费,
但是其性能是极其低下的,
基本不在企业级考虑的范围,
并且也不是完全的能做到精准一次消费
while ( isRunning. get () ) {
ConsumerRecords<String , String> records= consumer.poll(1000) ;
for (ConsumerRecord<String , String> record : records) {
//do some logical processing.
//读取消费的消息的 offset
long offset= record.offset() ;
TopicPartition partition =new TopicPartition(record.topic() , record.partition()) ;
// 提交位移
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1))) ;
}
}
- 按分区提交 该方式其实是综合了 批量提交 和 单条消息提交一次,
按分区的小批次提交,
如果你要使用同步提交的方式,
那么建议你使用该方式
try {
while (isRunning.get() ) {
ConsumerRecords<String , String> records= consumer .poll(1000);
for (TopicPartition partition : records.partitions( )) {
//取出每个分区的消息
List<ConsumerRecord<String, String> partitionRecords = records . records(partition)
for (ConsumerRecord<String , String> record : partitionRecords) {
//消费该分区的消息*****
//*********
//将该分区的 offset 提交
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1) .offset() ;
consumer.commitSync(Collections.singletonMap ( partition ,
new OffsetAndMetadata(lastConsumedOffset + 1))
);
}
}
}
}finally {
consumer.close();
}
//三个重载方法
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets ,OffsetCommitCallback callback)
commitAsync
属于异步提交,
也就是不会阻塞线程,
比起同步提交commitSync
具有更好的性能。
这里我们主要来讨论下OffsetCommitCallback callback
回调的使用,
理解起来很简单,我们每提交一次 Offset,
callback 都会告诉我们是否提交成功。
那么如果我们提交失败了怎么办呢??
- 一般的想法就是:失败了?那重新提交呗。 这种方式是否可行?我们看下面这个列子。
如果一个消费者消费到了 offset=10,
我们就异步提交了 offset=11,
继续拉取消息 offset=11-20,
这个时候 提交的 offset=11 还没有返回成功,
我们提交 offset=21,
返回 offset=21 提交成功。
OK,现在提交 offset=1的那条消息返回了,
并且是失败的,
那么如果你去重试,
提交 offset=11 就会覆盖掉 已经提交的 offset=21
很明显这不是我们想要的。
- 正确的做法:
这个时候需要客户端维护一个序列号,
每次提交成功都 +1,
重试的时候进行对比,
不合法就不需要重试了。
当然实际情况,
一般提交offset不会失败,
并且就算失败一次也不会有问题,
因为后面每次消费一样会进行offset提交,
而对于消费者正常退出,
我们可以使用,
try {
while(isRunning.get()) {
//poll records and do some log 工cal processing .
consumer . commitAsync() ;
}
) finally {
try {
consumer.commitSync() ;
) finally {
consumer.close() ;
}}
- 再均衡导致的重复消费: 再均衡发生的时候也可能会导致消费者的offset来不及提交,
这时候我们需要在监听到再均衡发生的时候进行一次offset提交:
//该对象需要保存该消费者消费的分区的最新的 offset
//本段代码中没有体现,可以在消费数据之后 进行更新该对象
Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;
consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
//发生在 再均衡 之前,并且消费者停止读取消息的时候
@Override
public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {
consume.commitSync(currentOffsets) ;
currentOffsets.clear();
}
@Override
public void onPartitions Assigned(Collection<TopicPartition > partitions) {
//do nothing .
}
} );