
【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失
【Flink】第五篇:checkpoint【1】
【Flink】第十四篇:LSM-Tree一般性总结
接上篇对于Redis Connector的保序的思考后,在自研Connector中,关于数据一致性还有一个重要的点需要考虑,即如何保证数据投递的语义:
当然不需要全实现,而是根据场景及外部存储的特点,实现满足需求的数据投递语义能保证数据一致性的目的即可。
对于以上三种语义,精确一次是最严格的,所以精进一法,则万法皆通。
对于精确一次的语义可以先参考我之前的两篇文章有理论上的储备,接着我将再从现有的典型connector着手即可窥探其中奥义。
先将重要的结论贴出来:
flink的数据一致性需要source和sink满足以下两个条件:
体现在kafka connector中即,


这是支持事务的真正端到端精确一次的kafka connector,如果2PC期间失败了,事务是没有提交成功的,所以也等于是没有被下游消费的。
但是还有很多外部存储系统不支持事务的特性,或者说支持事务特性的话效率太低,那么,这个时候就需要将外部存储系统设计成幂等的了!(我们平时使用kafka connector也不会使用它的精确一次,因为在上一个CK成功之前,下游是不会收到这个CK期间处理的数据的,这种为了数据一致性带来的性能损耗是难以忍受的,本质上这也是一种酸碱平衡的问题)
那么,众多的connector是怎么做的呢?我将从jdbc connector的源码中找到答案!
JDBC Connector
从META-INF.services这个SPI文件找到JdbcDynamicTableFactory。
因为我们需要看的是sink端,所以找到createDynamicTableSink方法,取其返回值JdbcDynamicTableSink即为DynamicTableSink的实现类:

再找到提供运行时执行类的方法getSinkRuntimeProvider:

他通过建造者模式build了一个JdbcBatchingOutputFormat:

再看JdbcBatchingOutputFormat的UML结构,这个类即为Flink的RichSink富函数的实现类,所以我们一定能从中找到相关的open、close等打开关闭和执行数据库存数的逻辑:

我们先来看看open的逻辑:

主要逻辑是:启动了一个调度线程池,corePoolSize为1。线程池中调度的线程执行的逻辑是线程安全的执行JdbcBatchingOutputFormat这个当前实例的flush操作。
注意,在flush过程中的异常会设置给以下这个成员变量:
private transient volatile Exception flushException;然后再来看看flush()的逻辑:

而尝试刷写方法attemptFlush很简单,就是调用代理的一个Executor实例执行executeBatch:

说完了在调度线程池中定时执行的flush逻辑,再来看看它又是如何受理每条RowData数据到来的逻辑的:

而addToBatch的逻辑也很简单,同样也是调用代理的一个Executor实例执行addToBatch:

综合以上分析,得出以下结论:
再来看看这个执行者executor做了些什么,JdbcBatchStatementExecutor?

可以看到,这个解扣定义的功能是实现:
Executes the given JDBC statement in batch for the accumulated records即将batch提交给远端数据库。
具体逻辑拿实现类TableBufferReducedStatementExecutor一探究竟:
首先,重点注意这个reduceBuffer,它存储的是经过压缩的changelog类型的有主键的RowData数据:
the mapping is [KEY, <+/-, VALUE>]为什么说是经过压缩呢,Map这种数据结构天然是将key的value压缩,取最后存入的那一条的,所以,当对于一个主键的连续修改只会执行这个batch里最后的那一条修改操作,这样也降低了对数据库的压力。
那么,他是如何将RowData的RowKind进行映射的呢?

可以看到,在添加到batch的时候已经进行了映射,而且这居然是一种upsert的映射,可我们的关系型数据库并不是这样的操作啊,会出错的吧?!
我们继续往下看,

在执行batch中的DML的逻辑中用两个代理类分别执行upsert和delete操作。
找打这两个代理类并行分析后发现,是根据具体的数据库方言,将更新操作包装成了幂等的数据库操作!
例如,MySQLDialect

可以看到ON DUPLICATE KEY UPDATE,这个正是Mysql实现幂等的一种或方案!
简单地说,ON DUPLICATE KEY UPDATE 可以达到以下目的:
向数据库中插入一条记录:若该数据的主键值已在表中存在,则执行更新操作, 即UPDATE 后面的操作。否则插入一条新的记录。
综合以上对Flink jdbc Connector的分析,
1. 根据不同类型数据库,用具体数据库的SQL方言实现幂等方案
2. 在持久化DML到远端数据库过程中有任何异常,在符合设定阈值情况下立即抛出RuntimeException结束掉当前线程
那么为什么要有2.呢?
一旦持久化数据到远端数据库发生异常,如果我们不结束掉当前线程,那么checkpoint就会顺利执行下去(前提是我们不选择实现2PC的逻辑)。结果就是此次CK期间处理的数据在持久化过程中出现了问题,CK还顺利完成了,造成最终的数据不一致!
但是,如果我们妥善的处理这种持久化异常,并且将其暴露出反映给Flink的CK机制,此次CK失败后,就会从上一次成功的CK重新消费并重新持久化这次失败CK期间处理的数据,结果就是数据被再次持久化。并且由于下游是幂等的,所以无论上一次失败的CK是否提交了完整或者是部分数据,结果就是最终都会达到数据一致!
总结
