前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >厉害了!一文撕开Kafka Compact Topic神秘面纱

厉害了!一文撕开Kafka Compact Topic神秘面纱

作者头像
用户9421738
发布2024-08-06 15:18:06
1200
发布2024-08-06 15:18:06
举报
文章被收录于专栏:大数据从业者

背景

随着平台Kafka的对接客户越来越多,我发现很多人只知道Kafka Topic可以根据设置保存大小和保存时间触发数据清理机制,但是并不熟悉Kafka Topic另一种清理策略compact。遂有此文,本文主要介绍compact原理、相关配置、实践案例操作记录、相关源码分析等内容。欢迎关注微信公众号:大数据从业者

Compact原理

Kafka数据清理策略是由log.cleanup.policy参数决定的,当前支持两种策略:delete(普通主题默认)、compact(系统主题默认)。两种策略可以同时使用,互不冲突。所以,log.cleanup.policy可以设置为delete或compact或delete,compact。本文暂不涉及delete清理策略,只讲述compact清理策略。Kafka系统主题__consumer_offsets默认清理策略就是compact。

强调一点:compact策略仅对Topic内同时携带key和value的消息有效。换句话说,如果需要使用compact策略,那么producer发送的消息需要同时携带key和value。

我们知道Topic是由Partition组成的,producer将消息写入Partition,每条消息都会被分配一个唯一且不可变的offset。如下图所示:

如果清理策略是delete,那么当满足保存大小或者保存时间的条件时,触发数据清理机制。指定offset之前的消息都将被删除,也就是Delete Retention Point之前的消息,如下图所示:

换句话说,delete策略不会考虑消息的key或value是什么,更不考虑有没有相同的Key消息存在。而compact策略则会考虑同一分区内的相同key的消息,最终只保留相同key的消息中最新的value对应的消息。如下图所示,原始数据中K1有三条消息,经过compact处理,只保留K1:V4这一条消息。个人感觉,该过程称为compact并不是很贴切,应该称为update之类的。

Compact策略适用于只想保留当前快照而不是完整修改历史的场景。比如:为了保存员工工资信息,可以创建主题employee-salary且设置compact策略,如下图所示:

Compact关键保证

代码语言:javascript
复制
1.不影响没有消费延迟的consumer获取所有消息。换句话说,compact只会操作非active segment,而没有消费延迟的consumer正在消费active segment。    

2.Compact不会改变消息的offset值、key值、partition值、前后顺序。只是删除一些消息。

3.在log.cleaner.delete.retention.ms(默认24H)时间内,消费者仍能消费到待删除的消息。 

除了正常携带key和value的消息之外,compact还有一种特殊消息:key正常但value=null,这种消息称为tombstone消息。tombstone消息进行合没有意义,所以compact会删除这类消息。欢迎关注公众号:大数据从业者

Compact配置

代码语言:javascript
复制
log.cleanup.policy:清理策略(delete或compact或delete,compact)

log.cleaner.enable:是否启用compact清理任务

log.cleaner.threads:compact清理任务的线程数

log.segment.bytes:segmemnt文件的最大字节

log.segment.ms:segment保持active的最大时间

log.cleaner.backoff.ms: 清理任务闲时休眠时间

log.cleaner.min.compaction.lag.ms:触发compact的最小延迟时间

log.cleaner.max.compaction.lag.ms:触发compact的最大延迟时间

log.cleaner.dedupe.buffer.size:清理任务线程用于去重的内存

log.cleaner.delete.retention.ms:compact删除消息延迟删除时间    

log.cleaner.io.buffer.load.factor:清理任务线程 IO buffer负载率

log.cleaner.io.buffer.size:清理任务线程IO buffer内存

log.cleaner.io.max.bytes.per.second:清理任务线程IO限速

log.cleaner.min.cleanable.ratio:触发compact的脏数据比例

实践案例

1.创建测试主题

为便于测试,设置min.cleanable.dirty.ratio=0.001、segment.ms=5000以保证compact清理任务尽快执行,设置partitions=1以保证测试消息写入相同分区。

2.描述测试主题

3.启动消费者

4.启动生产者,发送测试消息

消息内容故意加入重复key,如下:

代码语言:javascript
复制
Patrick,salary: 10000

Lucy,salary: 20000

Bob,salary: 20000

Patrick,salary: 25000

Lucy,salary: 30000

Patrick,salary: 30000

5.查看第3步消费消息

可以看到消费到所有消息,证实上文Compact关键保证之一:不影响没有消费延迟的consumer获取所有消息。

6.等待一分钟,继续生产消息,如:Stephane,salary: 0

7.启动新的消费者

可以看到,经过compact清理,上述第4步发送的重复消息只保留最新value。

源码剖析

KafkaServer.startup会启动LogManager,LogManager.startup会启动一个Schedule线程池和一个LogCleaner(内部启动CleanerThread)。Schedule线程池中有一个任务为kafka-log-retention,对应于delete清理策略;而LogCleaner对应于compact清理策略。

本文只讲述compact相关的LogCleaner,其startup方法如下:

代码语言:javascript
复制
/**

   * Start the background cleaning

   */

  def startup() {

    info("Starting the log cleaner")

// cleaner线程数通过参数log.cleaner.threads配置,默认为1

    (0 until config.numThreads).foreach { i =>

      val cleaner = new CleanerThread(i)

      cleaners += cleaner

      cleaner.start()

    }

  }

接下来主要看下CleanerThread类主流程,位置在LogCleaner.scala文件。

主流程如下:

代码语言:javascript
复制
doWork -> cleanFilthiestLog ->  grabFilthiestCompactedLog -> cleanLog -> clean -> doClean

篇幅有限,doClean方法内容不再介绍。感兴趣请自行阅读。如下:

代码语言:javascript
复制
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala#L594

另外LogCleaner提供metric,方便问题排查和性能调优,如下所示:

总结

通过阅读本文,可以掌握compact原理、配置、实践操作、源码分析等内容。至此,Kafka Compact Topic使用与调优轻松拿捏!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据从业者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Compact原理
  • Compact配置
  • 实践案例
  • 源码剖析
  • 总结
相关产品与服务
腾讯云服务器利旧
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档