上面两篇聊了Kafka概况和Kafka生产者,包含了Kafka的基本概念、设计原理、设计核心以及生产者的核心原理。本篇单独聊聊Kafka的消费者,包括如下内容:
Kafka消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。 Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 消费者组的设计是对消费者进行的一个横向伸缩,用于解决消费者消费数据的速度跟不上生产者生产数据的速度的问题,通过增加消费者,让它们分担负载,分别处理部分分区的消息。
在一个消费者组中的消费者消费的是一个主题的部分分区的消息,而一个主题中包含若干个分区,一个消费者组中也包含着若干个消费者。当二者的数量关系处于不同的大小关系时,Kafka消费者的工作状态也是不同的。看以下三种情况:
创建了Kafka消费者之后,接着就可以订阅主题了。订阅主题可以使用如下两个 API :
消息轮询是消费者API的核心,消费者通过轮询 API(poll) 向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只需要关注从分区返回的数据,然后进行业务处理。 一个简单的消费者消费的代码样例如下:
try {
while (true) {
// 轮询获取数据
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} finally {
consumer.close();
}
与生产者类似,消费者也有完整的配置列表。接下来一一介绍这些重要的属性。
消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。主要是为了降低消费者和Broker的工作负载。
broker 返回给消费者数据的等待时间,默认是 500ms。如果消费者获取最小数据量的要求得不到满足,就会在等待最多该属性所设置的时间后获取到数据。实际要看二者哪个条件先满足。
该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。
消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量。 异步提交 为了解决同步提交降低程序吞吐量的问题,又有了异步提交的方案。 异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待 Broker 的响应。代码样例如下:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// 异步提交并定义回调
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));
}
}
});
}
异步提交如果失败,错误信息和偏移量都会被记录下来。 尽管如此,异步提交存在的问题是,如果提交失败不能重试,因为重试可能会出现小偏移量覆盖大偏移量的问题。 虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试。可以通过一个 Map<TopicPartition, Integer> offsets 来维护你提交的每个分区的偏移量,也就是异步提交的顺序,在每次提交偏移量之后或在回调里提交偏移量时递增序列号。然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。 同步和异步组合提交: 当发生关闭消费者或者再均衡时,一定要确保能够提交成功,为了保证性能和可靠性,又有了同步和异步组合提交的方式。也就是在消费者关闭前组合使用commitAsync()方法和commitSync()方法。代码样例如下:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因为即将要关闭消费者,所以要用同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}
上面的提交方式都是提交当前最大的偏移量,但如果需要提交的是特定的一个偏移量呢? 只需要在重载的提交方法中传入偏移量参数即可。代码样例如下:
// 同步提交特定偏移量
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
// 异步提交特定偏移量
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
上面的消费过程都是以无限循环的方式来演示的,那么如何来优雅地停止消费者的轮询呢。 Kafka 提供了 consumer.wakeup() 方法用于退出轮询。 如果确定要退出循环,需要通过另一个线程调用consumer.wakeup()方法;如果循环运行在主线程里,可以在ShutdownHook里调用该方法。 它通过抛出 WakeupException 异常来跳出循环。需要注意的是,在退出线程时最好显示的调用 consumer.close() , 此时消费者会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。 下面的示例代码为监听控制台输出,当输入 exit 时结束轮询,关闭消费者并退出程序:
// 调用wakeup优雅的退出轮询
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
// 等待主线程完成提交偏移量、关闭消费者等操作
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> rd : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
}
}
} catch (WakeupException e) {
// 无需处理此异常
} finally {
consumer.close();
}
【参考资料】 《Kafka 权威指南》