【kafka事务简介】
在kafka的0.11版本中,引入了kafka事务的特性,确保在一个事务中发送的多条消息,要么都成功,要么都失败。这里说的多条消息可以是发送给不同topic的多个消息。
kafka事务机制的使用,更多的是在流式计算中,从消息源获取消息,然后通过一系列计算最后发送到kafka的场景中,保证每条消息恰好被计算一次,从而实现输入输出端到端的Exactly Once的语义。
【事务的使用与原理】
1. 生产者使用事务
对于生产者而言,其使用方式还是比较简单的,首先进行事务初始化,然后开始执行事务,发送消息,最后提交事务即可。
// 1. 初始化事务
producer.initTransactions();
// 2. 标记开始事务
producer.beginTransaction();
// 3. 消息发送
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
// 4. 提交事务
producer.commitTransaction();
producer.close();
事务生产的具体流程如下图所示:
1)生产者进行事务的初始化,这里主要是向服务端发送生产者事务ID,该ID在服务端唯一。如果不同的生产者使用了同一个事务ID,在服务端会关闭处于正在进行但还未进行提交的事务,同时服务端会对epoch进行递增,后续的事务请求都必须带上该epoch,以标记事务的执行者,防止并发操作出现问题。
需要注意的是:事务初始化的请求是发送给事务coordinator对应的broker(kafka内部采用名为__transaction_state的topic记录事务的信息,与消费者组的方式类似,事务ID编码后的hash值就是该事务记录的分区,分区leader所在的broker也就是该事务coordinator对应的broker),因此请求发送前会有一个查找coordinator的交互流程。
2)在完成事务初始化后,随后生产者就是进行消息的发送。在真正进行消息发送前,会给coordinator同步消息发送的topic分区信息,方便coordinator最后进行结束标记的记录。对于消息的发送,服务端的处理逻辑和非事务的处理逻辑是一致的,topic分区leader对应的broker收到消息后按批进行消息的持久化。
3)消息发送完成后,开始向coordiantor进行事务的提交(事务回滚也是同样的流程),服务端对于该请求以两阶段提交的方式进行处理。
a. 首先在__transaction_state的topic中记录事务预提交的日志信息。
b. 然后向事务中记录的所有topic分区(第4步添加的分区信息)发送结束标记请求,对应的broker对该请求的处理就是将标记信息以一个控制记录添加到日志中。
c. 最后再记录事务提交的信息到topic(__transaction_state)中。
到这里一次完整事务就完成了,接下来生产者可以继续新的事务(不需要再初始化)。
2. 消费者消费
从上面的流程可以看出不管事务的提交与回滚,实际发送的消息都已经发送到对应的topic并进行了持久化。这样以来,对于消费者而言,在如果不做任何改动的情况下, 未正确事务提交的消息依旧是可以被消费者读取。因此,如果消费者想要仅消费事务提交的消息,需要加上对"--isolation-level"配置项设置为"read_committed"(默认为"read_uncommitted")。
从kafka消费者的具体代码实现中,可以看到在获取消息后,根据"isolationlevel"的级别进行判断与该消息是否为事务发送的消息来进行相应的处理。
至于怎么判断一批消息是否处于事务中,这个是由生产者发送的消息中,在RecordBatch的header中添加了生产者事务ID(参考《kafka的消息持久化文件》中的图),以此可以区分消息是否处于事务中。
另外,从持久化的日志中(下图所示是在一次事务中发送了5条消息)也可以看出来:
这里有一点需要注意:
事务结束标志的日志是一个特殊的,称为control(控制)的消息。controler消息当前只有commit和abort两类。此外,虽然该类型的消息不会发送给消费者,但实际会占用一个偏移量。
3. 服务端的事务状态记录
上面流程中的coordinator,想必大家应该能联想到消费者组中也有一个coordinator。两者在工作机制上是没有太大区别的,每个生产者事务都有一个对应的coordinator负责相关的请求处理,在每个coordinator内部也都有一个状态机记录事务的状态变化。
每个请求事件触发后的状态信息,最终都以消息日志的形式记录到名为"__transaction_state"的topic中。
【总结】
本文简单介绍了kafka中生产者事务的相关原理,实际使用时,还有很多注意事项,例如,需要注意事务的超时时间(超时无状态变更会自动回滚),事务ID的唯一性问题(防止并发操作出现问题),以及各种异常情况(对应二阶段提交时的各种异常)等。