摘要:本文总结了 Dinky 社区在 Doris Summit 2022 上分享的《Dinky 在Doris实时整库同步和模式演变的探索实践》,其分享主要分为四个章节,内容包括:
Tips:历史传送门~
《Doris + Flink + DolphinScheduler + Dinky 构建开源数据平台》
GitHub 地址
https://github.com/DataLinkDC/dinky
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~
一、整库入仓的需求与挑战
关于数据入仓的需求起源,我们先来了解两个概念:ETL 和 ELT。
ETL 是用来描述将数据从源端经过抽取、转换、加载至目的端的过程,通常其数据处理依赖计算引擎的能力,一个ETL任务对应一个业务查询需求;ELT 则是将源端数据采集到一个具备较强运算能力的数据库,然后发挥数据库自身的并行计算能力来满足各种查询需求。
随着业务需求日益增多、计算逻辑更加复杂,通过 ETL 来处理时,需要开发大量的 ETL 任务并且管理大量的结果表,而新的业务需求则需要开发新的 ETL 任务,开发运维成本巨高,所以基于 ELT 的数据入仓建设尤为重要。除了可以降低上述的建设成本,还可以减少对业务库的侵入,提供高效的 OLAP 分析能力。
传统数据入仓架构1.0
我们可以先来看看传统数据入仓的架构。
在早期的数据入仓架构中,一般会每天 SELECT 全量数据导入数仓后再做离线分析。这种架构有几个明显的缺点:
传统数据入仓2.0
到了数据仓库的 2.0 时代,数据入仓进化到了 Lambda 架构,增加了实时同步导入增量的链路。
整体来说,Lambda 架构的扩展性更好,也不再影响业务的稳定性,但仍然存在一些问题:
CDC 入仓架构
随着计算引擎和 MPP 数据库的发展, CDC 数据入湖架构,可分为两个链路:
· 有一个全量同步 Spark 作业做一次性的全量数据拉取;
· 还有一个增量 Spark 作业通过 Canal 和处理引擎将 Binlog 数据准实时地同步到 Doris 表中。
这个架构虽然利用了 Doris 的更新能力,无需周期性地调度全量合并任务,通过 Spark 能做到分钟级延迟。但是全量和增量仍是割裂的两个作业,全量和增量的切换仍需要人工的介入,并且需要指定一个准确的增量启动位点,否则的话就会有丢失数据的风险。可以看到这种架构是流批割裂的,并不是一个统一的整体。
FlinkCDC 实时入仓架构
对于上述架构存在的问题,Flink CDC 的出现为数据入仓入湖架构提供了一些新思路。借助 Flink CDC 技术的全增量一体化实时同步能力,结合 Doris 提供的更新能力,整个架构变得非常简洁。我们可以直接使用 Flink CDC 读取 MySQL 的全量和增量数据,并直接写入和更新到 Doris 中。
这种简洁的架构有着明显的优势。首先,不会影响业务稳定性。其次,提供分钟级产出,满足近实时业务的需求。同时,全量和增量的链路完成了统一,实现了一体化同步。最后,该架构的链路更短,需要维护的组件更少。当然,还可以发挥 Doris 极速易用的查询能力。
FlinkCDC 整库入仓挑战
那 FlinkCDC 实时入仓又有哪些痛点和挑战呢?或者我们可以理解为 FlinkCDC 在整库入仓具备哪些挑战。
整库入仓一大特点是表数目多、字段多,那基于 Flink CDC 需要开发和运维的任务也会多。自然会带来手工字段映射易出错、开发大量 FlinkSQL 等问题,而且大量的 INSERT 语句会占用大量的数据源连接数,重复读取 Binlog,这对业务库和网络的影响都是非常大的。
FlinkCDC 模式演变挑战
此外,在整个实时入仓过程,用户还比较关注一点是自动模式演变。
一般在源库表结构发生变动时,如图所示源库表新增列 age,但目标端无法同步新增,且 Flink 任务的计算逻辑无法变更,导致无法将新增列的数据写入目标端,造成任务异常。那如何实现表结构变更自动同步及新列数据自动同步呢?这也是 FlinkCDC 整库模式演变的挑战。
二、基于 Flink 的实时计算平台
Dinky 是一个基于 Apache Flink 的实时计算平台,它具备开箱即用、易扩展等特点,可以用来连接常见的 OLAP 和数据湖等框架,正致力于流批一体与湖仓一体的探索实践。
Dinky 发展历程
Dinky 目前历经一年半的发展,从个人项目到社区项目,从 Dlink 到 Dinky,期间获得了众多业界老师的认可与支持。在鼓励之下,社区开始在各个开源大会上分享自己的实践经验,希望可以减少用户建设实时数仓的成本。
Dinky 主要功能
Dinky 具备实时数据开发和任务运维两大业务功能。
在数据开发中提供了用户在生产中常用的一些辅助功能,如 Flink SQL 自动提示与补全、语法校验、调试查询、血缘分析、Catalog 管理、Jar 任务提交、UDF 动态加载、全局变量、执行环境、语句生成和检查点托管等功能,配合资源管理来协作完成各类数据开发需求,改善用户的开发体验。
在任务运维中主要是对 Flink 任务和集群的监控与报警,同时记录各 Flink 实例的 Metrics,做到统一管理。
在最新的版本里也提供了对企业级功能的支持,如多租户、角色权限等。
Dinky 基于 Flink 的数据平台的定位,也促使其可以很好的融入各开源生态,如 Flink 各类衍生项目、海豚调度、Doris 和 Hudi 等数据库,进而来提供一站式的开源解决方案。
Dinky 核心优势
那 Dinky 相对于其他平台,具备哪些优势呢?
首先,它兼容 Flink 1.11 及以上版本,扩展新版本支持的成本非常低,也可以扩展用户自身二开的 Flink。
而在 FlinkSQL 方面,它扩展了 FlinkSQL 的一些额外语法以便于企业用户可以低成本使用,如全局变量、整库同步等。
在扩展性上,可以基于 SPI 等方式快速扩展 Flink 定制任务、整库同步、Catalog 管理、数据源、报警方式等功能,以让企业用户可以基于此框架来低成本实现自身业务。
在开源生态圈里,支持刚刚所提到的各类开源项目,而更多的生态项目也正逐步支持。
此外它可以通过近一站式的数据开发模式,包含 Flink SQL、Jar、UDF 等的开发调试运维,未来也将计划支持 DataStream 的在线开发运维。
最后正如其名 Dinky 一样小巧精致,基于 Flink+React+SpringBoot 开发,架构简单且部署方便,易于企业用户进行定制改造或集成到其他平台。
Doris 在 Dinky 中的应用——数据控制台
首先是 Doris 数据控制台,Dinky 提供了数据源注册和执行 SQL 的能力,可以很便捷地获取 Doris 的元数据信息,如图在描述选项卡中可以查看 Doris 表和字段的元数据信息,在数据查询选项卡可以快速自助查询 Doris 表中的数据,SQL 生成选项卡则可以一键生成 Flink CREATE TABLE 语句及其它 SQL 语句等。最后的控制台则可以执行 Doris 的 SQL 语句,也执行 Doris 自身特殊的语句,如图通过 show backends 来查看 be 的信息。相较于 Doris 自带的 web 控制台,它支持元数据提示与补全能力。
Doris 在 Dinky 中的应用——作业管理
在 Dinky 中支持创建和开发 Doris 类型的作业,可以对 Doris 数据源执行 Doris SQL 语句及查询其元数据信息,也支持如图所示的字段级血缘分析的展现,当然也可通过图表功能快速将 Doris 查询结果数据可视化。
Doris 在 Dinky 中的应用—— FlinkSQL 读写
Dinky 的优势是对 Flink SQL 任务开发与运维全面支持,在 Flink SQL 任务中,可以使用 Doris Connector 来高效查询或写入 Doris 数据库。
首先是离线查询,即对 Doris 进行有界的流查询或者直接使用批查询来读取 Doris 中的数据,通过 doris.filter.query 参数可以利用 Doris 自身极速的查询引擎提前过滤数据,来将两者各自的优势很好地融合在一块。
然后是离线写入,即可以使用 FlinkSQL 以离线的方式将数据按批次写入 Doris 中,写入支持数据更新。
最后是实时更新,通过 FlinkCDC 和 Flink SQL 将数据库日志或流数据实时处理并写入 Doris 数据库,支持 Exactly once 语义。
Doris 在 Dinky 中的应用—— FlinkCDC 整库入仓 Doris
Dinky 实现了 FlinkCDC 整库入仓入湖的能力并对其进行了性能和成本优化。所以可以直接通过一句 SQL 来实现整库实时入仓 Doris。
目前对 Doris 支持四种类型的作业,实现区分是 FlinkSQL 和 DataStream,功能区分主要是只数据写入、数据及元数据写入、数据写入及模式演变。
三、FlinkCDC 实时整库入仓
那接下来将重点介绍 Dinky 在 FlinkCDC 整库入仓 Doris 的实现及优化细节。
之前我们已经分析了当前阶段 Flink CDC 整库入仓面临着手工字段映射易出错、开发大量 FlinkSQL、占用大量连接数、Binlog 重复读取的挑战。
我们可以站在用户的角度想一想,数据整库入仓这个场景用户到底想要的是什么呢?
我们可以先把中间的数据集成系统看成一个黑盒,用户会期望这个黑盒提供什么样的能力来简化入仓的工作呢?
· 首先,用户肯定想把数据库中全量和增量的数据都同步过去,这就需要这个系统具有全增量一体化、全增量自动切换的能力,而不是割裂的全量链路 + 增量链路。
· 其次,用户肯定不想为每个表去手动映射 schema,这就需要系统具有元信息自动发现的能力,省去用户在 Flink 中创建 DDL 的过程,甚至帮用户自动在 Doris 中创建目标表。
· 另外,用户还希望源端表结构的变更也能自动同步过去,不管是加列减列和改列,还是加表减表和改表,都能够实时的自动的同步到目标端,从而不丢失任何在源端发生的新增数据,自动化地构建与源端数据库保持数据一致的 ODS 层。
· 更重要的是,还需要有具备生产可用的整库同步能力,不能对源端造成太大压力,影响在线业务,即只使用一个连接数。
上述四个核心功能基本组成了用户理想中所期待的数据集成系统,而这一切如果只需要一行 SQL,一个Job就能完成的话,那就更完美了。我们把中间的这个系统称为 “全自动化数据集成”,因为它全自动地完成了数据库的入仓入湖,解决了目前遇到的几个核心痛点。而且目前看来,Flink 是实现这一目标非常适合的引擎。
接下来我们分析下这五点如何实现。
· 首先,对于全增量自动同步,Flink CDC 已经通过“增量快照读取算法”实现了全增量无锁读取和自动切换的能力,这也是 Flink CDC 的亮点之一。
· 在元数据的自动发现上,可以通过 Flink 的 Catalog 接口无缝对接上,也可以通过数据源链接来获取元数据信息,Dinky 内置了数据源注册及元数据查询功能,可以自动发现 MySQL 等数据源中的表和 schema。
· 在表结构变更的自动同步方面,FlinkCDC 支持获取 Schema 变动数据,但 FlinkSQL 任务无法将其演变到目标数据源,需要借助目标 Sink 自身来实现 Schema Evolution。
· 在整库同步只用一个连接方面,FlinkCDC 的 DataStream 可以只创建一个连接来读取整库的变动数据和结构变更,而 FlinkSQL 只能读取一个表的变动数据。
· 最后关于一行SQL部署整个作业,可以通过 StatementSet 把所有 insert 语句合并为一个大作业,但仍占用大量连接数和重复读取 Binlog。可行的一种思路是通过 DataStream 对一个 Source 进行分流,分别写入对应的 Sink。
于是整库入仓 Doris 的实现思路可以总结为三步:
通过引入类似于 CDAS 语法,一行 SQL 语句就能完成整库同步作业的定义,并且实现了 source 合并的优化,减轻对源端数据库的压力。
CDAS 是基于 Catalog 的语法,而 Dinky 自身实现了类似的语法——CDCSOURCE,区别于 CDAS,它无需借助 Catalog 即可使用。
CDCSOURCE 也会解析成一个 Flink 作业执行,可自动解析配置参数,将指定的一个或多个数据库的数据全量+增量同步到下游任意数据源,也支持分库分表的同步。
如图所示是 CDCSOURCE 的基本原理,将 FlinkCDC DataStream Source 中获取的变动数据的序列化字符串解析为 Map,根据 Map 的元数据信息将数据分发到对应的 OutputTag,之后通过 FlatMap 转换为 RowData 供 Sink 写入到目标数据源。看似非常简单,但实现的细节和难点也很多,接下来我们详细了解下 Dinky 的源码实现。
此处的多源合并可以理解为是 Source 合并优化。Dinky 实现 Source 合并优化的思路非常简单,不同于 CDAS,Dinky 是直接通过 FlinkCDC DataStream 中的 Source 来获取整个数据源的变动数据,后续的所有数据处理都基于该 Source 进行。
如图源码所示,通过 Flink DataStream API 来构建了 MysqlSourceBuilder,其用到的参数均由 CDCSOURCE 的参数传递,主要是数据源链接配置以及要采集的库表,当然也可传递更高级的配置如 debezium 配置等。
当建立的数据库连接过多时,如左图所示,最直观地是创建了很多 FlinkCDC 的 Table,每一个 Table 会占用一个数据源连接数,同时 Binlog 重复读取会造成源库的巨大压力。
而 Dinky 的 CDCSOURCE,如右图所示,它会合并同一作业中的 Source,如果都是读的同一数据源,则会被合并成一个 Source 节点,这时数据库只需要建立一个连接,binlog 也只需读取一次,实现了整库的读取,降低了对数据库的压力。
根据元数据信息来构建侧输出流来对上一步多源合并的变动数据进行过滤分流。
从核心源码看,首先需要遍历元数据的所有表并通过正则将分库分表名转换为汇总库表名来提前构建对应的 OutputTag,将多源合并的 MysqlSource 输出的 DataStream<String> 解析为 Map 对象,然后通过 process 底层接口构建过滤分流的算子。在过滤分流的逻辑里主要分为两步,第一步是将分库分表的事件流过滤和并为其目标表的一个汇总事件流,第二步是在将该汇总事件流转变为之前创建的侧输出流进行旁路输出。
在构建分库分表的旁路输出时,通过正则表达式来匹配事件流中元数据信息的库表名,将符合目标表正则表达式的事件流合并到目标表的侧输出流。
事件流中的数据是 Debezium 的 JSON,如右上图所示,在其 source 属性下包含了此变动事件的元数据信息,对于 Mysql 来说主要用到 db 和 table 两个属性,db 对应 Mysql 的库名,table 则是 Mysql 的表名,将二者进行正则匹配,从上一页提到的源码可见,是匹配后返回了目标表的库表名,再通过库表名来选择此前创建好的侧输出流进行旁路输出。
事件流被汇总到侧输出流后,一般便是 Sink 环节了。对于 Sink,有两种实现方式,分别是使用 SQL/Table API 和 DataStream API。此处我们先来讲顶层的 SQL/Table API 实现思路。
第一步,先通过 DataStream 的 flatMap 方法将 Map 中的事件流转换为带有 RowKind 的流数据;
第二步,将 DataStream 中的流数据在 Temporary View 中注册,供顶层 API 直接查询;
第三步,根据元数据信息和 CDCSOURCE 语句的 sink 配置模板来生成每个目标表的 CREATE TABLE 语句并且执行,即在内存 Catalog 中注册目标表;
第四步,根据元数据信息来生成每个目标表的 INSERT 语句,然后通过 Parser 来获取对应的 Operations;
第五步,将所有的 Operations 合并为包含整库所有任务的 Operations,进行作业提交。
在第一步将事件流转换为流数据时,是依赖如右上图 Debezium JSON 的 before 和 after 以及 op 属性。before 是变动数据的原始内容,after 为变动数据的最新内容,op 则是本次变动事件的更新状态,主要有 r、c、u、d 四种情况,分别对应全量扫描、新增、更新、删除事件。
在 FlatMap 中对不同事件进行不同的处理,全量扫描和新增事件直接取最新数据转换为 INSERT 类型的流数据;删除事件则直接取原始数据转换为 DELETE 类型的流数据;更新事件需要两步,先把原始数据转换为 UPDATE_BEFORE 类型的流数据,再把最新数据转换为 UPDATE_AFTER 类型的流数据。
其源码实现也非常简单,主要是在 FlatMap 算子中根据 Map 中的 op 属性值进行分支处理,分别构建刚刚讲到的对应事件类型的 Row,同时进行数据类型的转换,然后写入 Collector 中即可。
最后一步是通过 MetaData 来构建整个任务的 Modify Operations。
首先将 DataStream 通过 createTemporaryView 转变为视图数据,再根据 MetaData 来生成目标表的 CREATE TABLE 语句,然后在 TableEnvironment 中执行将其注册到 Catalog 中。之后是根据 MetaData 来生成目标表的 INSERT 语句,通过 TableEnvironment 的 Parser 来解析 INSERT 语句获取 Operation 列表。由于单个 INSERT 语句只有一个 Modify Operation,所以数组长度为 1,将该 Modify Operation 都汇总到一个 Operation 列表,这就是基于 SQL/Table API 的整库同步任务完整的 Operation。
接下来我们讲讲 DataStream API 的 Sink 实现思路。
区别于 Table API,DataStream 在 FlatMap 中将事件流转变为流数据时,是转变成带有 RowKind 的 GenericRowData 数据。然后再后续 Sink 阶段,则是直接依据 MetaData 来构建每个表的 DorisSink。在构建DorisSink 时,字段配置通过 MetaData 的列信息映射,外加隐藏列构建,其他配置通过解析 CDCSOURCE 语句传递的 sink 参数进行设置。
Dinky 中也实现了支持元数据同时写入 Doris 的 Sink 实现。
主要是 DataStream 在 FlatMap 中将事件流的业务数据与元数据信息转变为流数据,如左图所示,从事件流 Map 中的元数据信息提取对应数据然后追加到流数据里。
以上就是 Dinky 的 CDCSOURCE 实现的具体思路。
四、FlinkCDC 实时模式演变
此外,还有一个用户比较关切的问题,如何在整库同步中实现自动模式演变。
我们再来回顾下模式演变的挑战,在源库表结构发生变动时,如新增列 age,但目标端无法同步新增,且 Flink 任务的计算逻辑无法变更,导致无法将新列的数据写入目标端,造成任务异常。那如何实现表结构变更自动同步及新列数据自动同步呢?接下来会分享下目前阶段我们的一些探索经验。
Light Schema Change 是 Doris 最新的一种在线进行加减列或修改列的实现方案,相对于其之前支持的 3 种 Schema Change 方式,Light Schema Change 具备解决 Schema 不一致问题、全局 Schema Cache、支持物化视图、解决数据重写问题的优势,由于其只修改了 FE 的元数据,通过对 BE 读写流程进行修改来支持获取正确的 Schema 信息,性能便达到毫秒级别,这也为在实时整库同步时同步变更 Schema 提供了基础。
在 Doris 最新 1.2 版本创建表时开启 Light Schema Change 即可使用该特性。
此前 Doris 旧版本在处理 Flink 的模式演变时,通常会由于 Doris 进行 Schema Change 的成本较高,较高的耗时期间无法写入数据,直接导致阻塞上游数据,造成数据积压。此外还需要人工解析并进行 Doris Schema 的更新维护,重启 Flink 作业来构建最新的执行计划。
当我们使用 Doris 1.2 版本,开启 FlinkCDC 的 schame.changes 参数,开启 Doris 表的 Light Schema Change 参数时,可以通过最新版本的 Doris 连接器实现自动识别 DDL 操作并毫秒级执行完成,避免双写和阻塞数据的问题,自动序列化,无需关心 Schema 变动,即无需重启 Flink 作业。
最新版本的 Doris 连接器支持直接接收 CDC 产生的 JSON 字符串数据,然后自动解析数据并写入目标表,且支持解析引起 Schema 变更的 DDL 语句并且进行自动执行来实现模式演变。这让 Dinky 实现整库同步加模式演变更加容易,只需要在构建 OutputTag 时直接将 Debezium JSON 的 Map 序列化为 String 来输出,根据 CDCSOURCE 语句的 sink 配置来构建每个目标表的 DorisSink,无需关心 Schema。如图所示为最新版本的 Doris Sink 构建过程,省去 Schema 等配置,更加简洁。
最终呢,我们通过 Dinky 的一句 CDCSOURCE 的语句,便可以完成整库实时入库 Doris,且支持一定的模式演变能力。对于之前讲到的全增量自动切换、元数据自动发现、表结构变更自动同步、整库同步只用一个连接、一行 SQL 部署整个作业这个五个用户诉求的功能基本实现。
五、未来展望与计划
当然,目前该方案还存在一定的问题,待后续持续跟进优化。比如,
Doris light_schema_change 配置只能在新建表时指定,已有的表不能修改;
Doris 连接器只支持新增和删除列操作;
Doris 连接器不支持表级模式演变,如新建表;
Doris 连接器的 DDL 识别与转换只支持 MySQL,其他数据源兼容性有待提升;
Doris 连接器要求库名和表名必须与源库保持一致。
未来计划
未来呢,我们也将持续推进 Doris 整库同步与模式演变的探索与优化,争取可以为用户提供一个完善的解决方案。此外我们也会支持更多数据源类型的模式演变,目前发现该工作与数据源自身及其连接器能力有直接关系,
最后也会不断探索更多 Doris 在 Dinky 中的应用能力,为大家待来更多的开源实践分享。