文档中心>消息队列 CKafka 版>实践教程>消费进度未及时同步到服务端

消费进度未及时同步到服务端

最近更新时间:2024-12-02 12:03:22

我的收藏

问题现象

CKafka 消费进度未及时更新服务端,无法通过控制台看到消费速度。

前置储备

Ckafka 集群展示消费进度,是以客户端消费位移 offset 的提交频率来评估消费速度和消费进度,故遇到此类问题优先排查客户端消费提交 offset 相关情况。


排查指引

步骤1:检查是否有生产数据写入

如果发现消费没有出现堆积和消费速度,可检查 Topic 以及生产监控指标,查看是否有数据写入。
1. 进入 控制台,单击实例列表 > ID/名称 > Topic 管理,查看对应 Topic 相关分区末端 offset 是否会更新。

2. 进入 控制台,单击实例列表 > ID/名称 > 监控 > Topic,查看对应 Topic 相应生产流量、Topic 占用磁盘的消息总量等指标是否大于 0。

如果 Topic 无实时数据写入,有下述两种情况:
新建消费组,配置 auto.offset.reset=latest 行为,则会从最新开始消费,由于没有实时数据写入,不会产生 commit offset 行为;
Topic 数据过了保留时间,配置 auto.offset.reset=earliest 行为,则会从最头开始消费,由于数据已经 retention,消费不到数据,也不会产生 commit offset 行为。
上述情况不产生 commit offset 行为,故而,服务端不会更新消费进度。

步骤2:检查客户端配置

对于 Kafka 原生 Client,需要检查 Consumer 配置,是否开启自动提交 offset:enable.auto.commit

原生 Kafka Java Client 举例

一种直接检查客户端配置,还有一种方式可以从程序的日志里面搜索 ConsumerConfig,如下图所示。该日志只在 KafkaConsumer 初始化时打印,如果日志保留时间较短可能搜索不到。

依据客户端消费特点,如果可以开启自动提交 offset,则可以打开该参数,客户端消费进度将更新到服务端;
如果需要手动管理 offset 的提交,比如对重复消费或丢数有更严格要求,enable.auto.commit 被关闭的情况下,需要检查客户端程序,是否在消费过程中,同步或者异步执行 offset 的提交动作,例如 consumer.commitSync()
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(500);
for (ConsumerRecord record : consumerRecords) {
System.out.printf("收到消息: partition:%d, offset:%d, time:%d, key:%s, value:%s\\n", record.partition(), record.offset(), record.timestamp(), record.key(), record.value());
}
consumer.commitSync();
}
原生 Kafka Client 经过上述排查调整后,再次观察客户端消费进度是否更新到服务端。

针对 Kafka 流式场景,Flink 出现频率相对较多

对于 Flink 消费客户端,其底层与 Kafka 的访问集成至 Flink的Kafka Source 接口,并与 Flink 框架容错机制相结合,比如 checkpoint 和 savepoint 机制。在这里就会面对两种情况:Kafka Client 底层 enable.auto.commit 处理和 Flink checkpoint 对 offset 的处理。
Kafka Source 在 checkpoint 完成时提交当前的消费offset ,以保证 Flink 的 checkpoint 状态和 Kafka broker 上的提交位点一致。
如果未开启 checkpoint,Kafka Source 依赖于 Kafka consumer 内部的位点定时自动提交逻辑,自动提交功能由 enable.auto.commit 和 auto.commit.interval.ms 两个 Kafka consumer 配置项进行配置。需要注意的是,如果用户在使用 Flink Kafka Source 时没有主动在配置中指定enable.auto.commit,Flink框架将会覆盖 enable.auto.commit=false,即关闭自动提交 offset,如果 Flink 未开启 checkpoint,则需要用户手动开启该参数。