在实时数仓分层中,Kafka是一种比较常见的中间存储层,而在分布式计算中由于硬件、软件等异常导致的任务重启是一种正常的现象,通过之前的Kafka-Consumer分析得知,offset 是跟随着checkpoint周期性的保存, 那么消息是有可能被重复消费的,而Kafka 作为输出端并不属于整个Flink任务状态的一部分,重复被消费的消息会重复的输出,因此为了保证输出到Kafka数据的一致性,Flink 在Kafka Sink端的事务语义。本篇主要介绍Kafka-Sink 的执行流程与核心设计。
幂等
在通常情况下,生产者发送数据可能由于网络等原因导致数据重复发送, 常见的解法就是幂等操作, 也就是执行多次相同的操作与其执行一次的影响结果是一样的。Kafka 不像MySQL/HBase 这样存储可以通过uniqueKey或者RowKey 机制来保证幂等, 为了实现幂等引入了两个概念producerId与sequenceNumber, 每一个producer 都会有一个由服务端生成的producerId与之对应,sequenceNumber 是partition级别的自增消息序列号,客户端每一条消息都会对应生成一个sequenceNumber,在服务端同样会保存该sequenceNumber, 只有当客户端消息的sequenceNumber 大于服务端存储的sequenceNumber 该消息才会被接受,通过这种方式保证消息的幂等性,从而保证数据的一致性。
但是对于幂等消息有个重要的问题:不能跨topic 、跨partition 保证数据一致性,如果producer 生产的消息横跨多个topic、partition, 可能会存在部分成功,部分失败的情况;另外幂等只是在单次producer 会话中, 如果pruducer 因为异常原因重启,仍然可能会导致数据重复发送。因此引入了事务解决该问题。
事务
事务要求遵循原子性,即要么成功要么失败,为了应对跨topic、跨partition问题,kafka引入了TransactionCoordinator 事务协调者,由该协调者协调事务的提交与回滚操作,同时引入了_transaction_state 日志来持久化事务信息(与事务相关的topic、partition、producer等), 其本质也是一个topic, TransactionCoordinator 通过_transaction_state 日志信息来恢复或者取消事务。
为了能够跨producer会话,提供了一个transactionId 的概念, 由客户端指定,能够保证producer重启时仍然能够找到对应的producerId (也就是你是你), 从而继续完成事务。transactionId与producerId 同样也会保存到__transaction_state 中。
前面分析了kafka-producer 幂等与事务相关的原理, 其可以保证单producer在跨topic、partition下的数据一致性,但是在Flink中是一个分布式的计算环境,多并发下会有多个producer 生产数据, 那么需要保证的是多个producer下的数据一致性。
通过之前对Flink的了解,Flink提供了基于checkpoint 下的两阶段提交流程(flink exectly-once系列之两阶段提交概述) ,该流程可以保证全局一致性的事务, 那么只需要将KakfkaProducer 的两阶段提交与Flink checkpoint提交融合起来即可实现。接下来看具体的融合逻辑:
左侧为正常事务的提交(以客户端的视角)流程,右侧为checkpoint 略缩版流程, 那么现在需要将这两部分逻辑融合起来:
Flink中将两阶段提交做了一个抽象 TwoPhaseCommitSinkFunction,其实现了CheckpointedFunction与CheckpointListener这两个与checkpoint流程相关的两个接口,提供了以下几个主要的抽象方法:
让使用者只需要实现这几个方法即可。那么接下来看在flink 的执行流程去看是如何调用这几个方法的:
从上面分析来看整个流程是比较简单的, 重点就在于如何做异常处理,面对可能会出现异常的情况做好检查点以便恢复处理。而对于FlinkKafkaProducer 的实现只需要继承TwoPhaseCommitSinkFunction 类,并且重写上面提到的几个抽象方法即可:
本篇主要从事务角度介绍了Kafka 事务实现与FlinkKafkaProducer事务的实现。