序
本文主要研究一下kafka0.8.2.2版本中ConsumerFetcherManager的MaxLag指标的统计。
问题
使用jmx查询出来的MaxLag跟使用ConsumerOffsetChecker查出来的总是不一样,几乎是jmx查出来的是0,但是实际是存在lag的。这里探究一下这个MaxLag的计算。
AbstractFetcherManager
kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherManager.scala
具体newGauge是调用KafkaMetricsGroup的方法
重点看这个计算逻辑,所有的数据都在fetcherThreadMap里头,key是BrokerAndFetcherId,value是AbstractFetcherThread,具体实例的类是ConsumerFetcherThread,它继承了AbstractFetcherThread
AbstractFetcherThread.fetcherLagStats
AbstractFetcherThread里头有个重要的字段,就是fetcherLagStats。
AbstractFetcherThread#FetcherLagMetrics
lag值的更新
lag值的更新在AbstractFetcherThread#processFetchRequest
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
这个是在AbstractFetcherThread#doWork方法里头
AbstractFetcherThread#doWork
ShutdownableThread#run
ConsumerOffsetChecker
kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala
主要是这个processPartition进行获取lag的逻辑
里头依赖的offsetMap获取逻辑如下
大体的逻辑就是
构造OffsetFetchRequest,获取consumer在topic的每个partition的消费的offset信息
构造OffsetRequest,获取topic的每个partition的logSize
logSize - consumer的offset = lag
小结
HighWaterMark
问题可能就在这个HighWaterMark:
ConsumerFetcherManager使用HighWaterMark - newOffset
ConsumerOffsetChecker调用SimpleConsumer的getOffsetsBefore,获取的是leaderEndOffset,即leaderEndOffset - newOffset
HighWaterMark取的是partition对应的ISR中最小的LEO,消费者最多只能消费到HW所在的位置
毫无疑问使用leader的offset肯定比使用HighWaterMark的数据要大,这样在replica延迟大的时候,表现更为明显
但是实际情况,即使消费端故意模拟耗时消费处理,也不见得这个数据变大,几乎总是0,因此问题还不是这个HighWaterMark
messages.lastOption
最后调试了一次,进入AbstractFetcherThread里头,看到这段数据的真实值,才恍然大悟
原来这里统计的是fetcher拉取的最新数据的offset与partition的HighWaterMark的差值,而拉取回来是放到一个内存队列里头让业务消费线程去消费的;它衡量的fetcher拉取的速度,而不是消费者消费的速度,要看消费者与生产者的lag值,就得使用ConsumerOffsetChecker去检查。
看来还真的不能望文生义,被坑了一天
doc
Kafka数据可靠性与一致性解析
AbstractFetcherThread
ConsumerFetcherManager MaxLag
领取专属 10元无门槛券
私享最新 技术干货