前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka事务剖析

kafka事务剖析

作者头像
陈猿解码
发布2023-03-21 13:23:00
3770
发布2023-03-21 13:23:00
举报
文章被收录于专栏:陈猿解码陈猿解码

【kafka事务简介】

在kafka的0.11版本中,引入了kafka事务的特性,确保在一个事务中发送的多条消息,要么都成功,要么都失败。这里说的多条消息可以是发送给不同topic的多个消息。

kafka事务机制的使用,更多的是在流式计算中,从消息源获取消息,然后通过一系列计算最后发送到kafka的场景中,保证每条消息恰好被计算一次,从而实现输入输出端到端的Exactly Once的语义。

【事务的使用与原理】

1. 生产者使用事务

对于生产者而言,其使用方式还是比较简单的,首先进行事务初始化,然后开始执行事务,发送消息,最后提交事务即可。

代码语言:javascript
复制
// 1. 初始化事务
producer.initTransactions();
// 2. 标记开始事务
producer.beginTransaction();
// 3. 消息发送
for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
// 4. 提交事务
producer.commitTransaction();

producer.close();

事务生产的具体流程如下图所示:

1)生产者进行事务的初始化,这里主要是向服务端发送生产者事务ID,该ID在服务端唯一。如果不同的生产者使用了同一个事务ID,在服务端会关闭处于正在进行但还未进行提交的事务,同时服务端会对epoch进行递增,后续的事务请求都必须带上该epoch,以标记事务的执行者,防止并发操作出现问题。

需要注意的是:事务初始化的请求是发送给事务coordinator对应的broker(kafka内部采用名为__transaction_state的topic记录事务的信息,与消费者组的方式类似,事务ID编码后的hash值就是该事务记录的分区,分区leader所在的broker也就是该事务coordinator对应的broker),因此请求发送前会有一个查找coordinator的交互流程。

2)在完成事务初始化后,随后生产者就是进行消息的发送。在真正进行消息发送前,会给coordinator同步消息发送的topic分区信息,方便coordinator最后进行结束标记的记录。对于消息的发送,服务端的处理逻辑和非事务的处理逻辑是一致的,topic分区leader对应的broker收到消息后按批进行消息的持久化。

3)消息发送完成后,开始向coordiantor进行事务的提交(事务回滚也是同样的流程),服务端对于该请求以两阶段提交的方式进行处理。

a. 首先在__transaction_state的topic中记录事务预提交的日志信息。

b. 然后向事务中记录的所有topic分区(第4步添加的分区信息)发送结束标记请求,对应的broker对该请求的处理就是将标记信息以一个控制记录添加到日志中。

c. 最后再记录事务提交的信息到topic(__transaction_state)中。

到这里一次完整事务就完成了,接下来生产者可以继续新的事务(不需要再初始化)。

2. 消费者消费

从上面的流程可以看出不管事务的提交与回滚,实际发送的消息都已经发送到对应的topic并进行了持久化。这样以来,对于消费者而言,在如果不做任何改动的情况下, 未正确事务提交的消息依旧是可以被消费者读取。因此,如果消费者想要仅消费事务提交的消息,需要加上对"--isolation-level"配置项设置为"read_committed"(默认为"read_uncommitted")。

从kafka消费者的具体代码实现中,可以看到在获取消息后,根据"isolationlevel"的级别进行判断与该消息是否为事务发送的消息来进行相应的处理。

至于怎么判断一批消息是否处于事务中,这个是由生产者发送的消息中,在RecordBatch的header中添加了生产者事务ID(参考《kafka的消息持久化文件》中的图),以此可以区分消息是否处于事务中。

另外,从持久化的日志中(下图所示是在一次事务中发送了5条消息)也可以看出来:

这里有一点需要注意:

事务结束标志的日志是一个特殊的,称为control(控制)的消息。controler消息当前只有commit和abort两类。此外,虽然该类型的消息不会发送给消费者,但实际会占用一个偏移量。

3. 服务端的事务状态记录

上面流程中的coordinator,想必大家应该能联想到消费者组中也有一个coordinator。两者在工作机制上是没有太大区别的,每个生产者事务都有一个对应的coordinator负责相关的请求处理,在每个coordinator内部也都有一个状态机记录事务的状态变化。

每个请求事件触发后的状态信息,最终都以消息日志的形式记录到名为"__transaction_state"的topic中。

【总结】

本文简单介绍了kafka中生产者事务的相关原理,实际使用时,还有很多注意事项,例如,需要注意事务的超时时间(超时无状态变更会自动回滚),事务ID的唯一性问题(防止并发操作出现问题),以及各种异常情况(对应二阶段提交时的各种异常)等。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-03-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 陈猿解码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档