前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【Apache Doris】Flink Doris Connector 整库同步使用指南

【Apache Doris】Flink Doris Connector 整库同步使用指南

作者头像
一臻数据
发布2024-12-24 15:44:07
发布2024-12-24 15:44:07
47500
代码可运行
举报
文章被收录于专栏:一臻数据一臻数据
运行总次数:0
代码可运行

导读 本文主要分享总结 Flink Doris Connector 整库同步的版本兼容、相关概要、功能特性、最佳实践和常见FAQ。


一、版本说明

从版本 1.4.x 开始,Flink-connector-doris 支持整库同步 MySQL 数据库到 Apache Doris。截至版本 connector-1.6.0,已支持同步多种数据库,包括 MySQL、Oracle、Postgres 和 Sql Server。未来版本将扩展支持 MongoDB 和 DB2。

  • 1.4.0:开始支持整库同步 MySQL。
  • 1.5.0:开始支持同步 MySQL 非主键表,并扩展支持整库同步 Oracle、Postgres 和 SQL Server。
  • 未来版本:计划支持 MongoDB 和 DB2。

二、前提概要

Connector整库同步功能底层依赖的是Flink CDC,因而对于相应关系型数据的支持,请参考Flink CDC官方文档 Flink CDC Overview:

https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/cdc-connectors/overview/

1. 功能特性

① 支持整库同步 MySQL、Oracle、PG 和 SQL Server 表DDL及数据到 Doris。具体映射类型和JDBC Catalog 中基本一致,可以参考JDBC Catalog中类型的映射:JDBC - Apache Doris。

② 支持 schema change,例如:add column、drop column、rename column。

③ 支持 开启 single-sink 后动态添加表。

④ 支持上游分库分表自动合并同步到Doris一张表。

⑤ 支持增加表的前后缀名以及指定表的 buckets 数量。

2. 同步任务前准备

① 确保数据源版本是Flink CDC支持的版本

如果使用的关系型数据库非以上版本可能会有兼容性问题,需要和社区人员一起努力解决。

② 确保数据源按照要求配置相关的日志 Flink CDC 底层依赖于Debezium,要确保成功使用Flink CDC捕获数据,首先需要确保在数据源,开启相关日志的配置,如:MySQL配置binlog日志。有关操作可以参考以下相关文档:

  • Flink CDC Connector
    • https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/cdc-connectors/overview/
  • Tutorial :: Debezium Documentation
    • https://debezium.io/documentation/reference/1.9/tutorial.html
  • 实时开发 - CDC 任务数据源权限配置 - 《实时计算用户手册-v4.1.0》
    • https://study.sf.163.com/documents/read/easystream/Develop_cdc_dataSource_permissons.md

③ 在 FLINK_HOME/lib 下添加正确的 Flink CDC 依赖包 要使用flink-connector-doris进行整库同步任务,首先需要在FLINK_HOME 目录下添加connector包以及flink cdc相关的包(如下图所示),对于目前1.4.x~1.6.x最好使用2.4.xflink-sql-connector jar包,否则可能会出现找不到类问题。

注意:

flink-sql-connector-mysql-cdc 与flink-connector-mysql-cdc是有区别的,使用flink-connector-xxx-cdc包会导致缺少相关的依赖报错。实际上flink-sql-connector-xx 是胖包,除了connector的代码外,还把 connector 依赖的所有三方包 shade 后打入,提供给 SQL 作业使用,用户只需要在 lib目录下添加该胖包即可。而flink-connector-xx-cdc 只有该 connector 的代码,不包含其所需的依赖,提供 datastream 作业使用,用户需要自己管理所需的三方包依赖,有冲突的依赖需要自己做 exclude, shade 处理。

三、功能介绍

1. 整库同步语法

代码语言:javascript
代码运行次数:0
复制
<FLINK_HOME>bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-{flink.version}-{connector.version}.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
[--table-suffix <doris-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
--sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]

2. 参数说明

相关具体的参数可以参考Doris官网:Flink Doris Connector - Apache Doris,本文对一些参数的用法加以特殊说明。

① --mysql-conf scan.incremental.snapshot.chunk.key-column="database.table:column,database.table1:column"

  • 自connector 1.5.0开始支持同步无主键表,为同步MySQL非主键表,必须配置 `scan.incremental.snapshot.chunk.key-column`, 其形式如下: `database.table.column`, 多个非主键表之间用`,`隔开。如果非主键表中含有索引,建议最好选择索引列这样能够提高查询速度。使用无索引列将会导致全量阶段多并发同时使用表锁。
  • 选择的分片列需要保证不存在数据的更新操作(比如从 1 更新到 2),如果存在更新操作,则只能保证 At-Least-Once 语义。

② --table-conf

table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"(since 1.5.2)

自connector 1.5.2 开始新增该参数根据正则表达式的顺序指定整库同步时表的buckets数量,如果有的表没有匹配到该参数,则建表时采用buckets auto。

③ --ignore-default-value "true"/"false"(since 1.5.0)

该参数主要是针对原表的schema 设置的default 值,但是插入的为null,如果不设置为true,则同步到Doris的null值将以defaul 填充。1.5.0 该配置只针对mysql有效,1.6.1 增加了对于Oracle,Postgres,Sql Server的兼容。

代码语言:javascript
代码运行次数:0
复制
-- MySQL:
CREATE TABLE `test_sink` (
  `id` int NOT NULL,
  `name` varchar(50) DEFAULT 'test_default',
  `age` int DEFAULT '10000',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

insert into test_sink (id,name,age) values(6, null, 666);
insert into test_sink (id,name,age) values(5, 'zhaoliu', null);

-- 默认导入进Doris:
select * from test_sink;
5,zhaolin,10000
6,test_default,666

--设置了--ignore-default-value,则默认值不会被同步,导入进Doris的数据会和数据源保持一致。
-- Doris:
select * from test_sink;
5,zhaolin,null
6,null,666

④ --use-new-schema-change "true"/"false" (since 1.5.0)

  • 1.6.0 默认打开
  • 自connector 1.5.0 开始支持rename column,drop column,add column,暂不支持修改数据类型

⑤ --single-sink "true"/"false" (since 1.5.0)

是否使用单个Sink同步所有表,开启后也可自动识别上游新创建的表,自动创建表

⑥ --multi-to-one-origin && --multi-to-one-target (since 1.5.0)

支持将上游多张表写入Doris中一张表

四、最佳实践

1. 同步主键表和非主键表

Apache Doris主要有Unique,Aggreate,Duplicate三种数据模型,对于数据源(MySQL,Oracle,Postgres,SQL Server)库中含有主键的表,将直接将该表映射成Doris Unique 表,而其他不包含主键的表,将直接映射成Doris中的dup表,对于MySQL同步非主键表,需要--mysql-conf scan.incremental.snapshot.chunk.key-column 来设置非主键表的chunk key。

如下例子中,MySQL ssb_test库中含有all_types,supplier,dates三张表,其中supplier,dates是两张无主键表,需要设置chunk.key,否则导入过程会出现如下错误:

代码语言:javascript
代码运行次数:0
复制
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
        at com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getChunkKeyColumn(ChunkUtils.java:67)
        at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:152)

flink-connector-doris利用Flink CDC整库同步的过程分为全量+增量两个过程,其中全量读取阶段可以并发无锁读取,增量阶段则切为单线程读取binlog,以防止重复拉去binlog数据。

如下将MySQL 非主键表和主键表导入到Doris中的shell脚本如下:

代码语言:javascript
代码运行次数:0
复制
bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.6.0.jar \
    mysql-sync-database \
    --database ssb_test_doris \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf port=3306 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=ssb_test \
    --including-tables "all_types|supplier|dates" \
    --mysql-conf scan.incremental.snapshot.chunk.key-column="ssb_test.supplier:s_suppkey,ssb_test.dates:d_datekey"
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label \
    --table-conf replication_num=1 \

启动flink任务后,可以通过flink webui观察导入任务,如下图:

图默认每个表一个sink

图开启single-sink ,多个表一个sink

可通过Flink metrics 查看当前每个表已经导入的数据总量以及其他metrics指标,如下图:

当整库同步从全量阶段切换到增量阶段时,可以通过日下日志看到全量阶段的split 都已经完成。

代码语言:javascript
代码运行次数:0
复制
21:20:31,896 INFO  com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus [SourceCoordinator-Source: MySQL Source]  - Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED
21:20:31,896 INFO  com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner [SourceCoordinator-Source: MySQL Source]  - Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.

2. 分库分表

在数据同步中,一个常见的使用场景是将上游由于业务或数据库性能问题而拆分的多表在下游系统合并为一张表。自connector-1.5.0开始支持将上游数据源多张表,sink到Doris一张表。

例如数据源Postgres数据源有6张表,其中order_1~order_3,customer_1~customer_2表结构相同。

如果我们想将业务库多张分表sink到Doris 同一张表,shell整库同步脚本如下:

代码语言:javascript
代码运行次数:0
复制
/bin/flink run \
     -Dexecution.checkpointing.interval=10s \
     -Dparallelism.default=1\
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.17-1.6.0.jar \
     postgres-sync-database \
     --database db1\
     --postgres-conf hostname=127.0.0.1 \
     --postgres-conf port=5432 \
     --postgres-conf username=postgres \
     --postgres-conf password="123456" \
     --postgres-conf database-name=postgres \
     --postgres-conf schema-name=public \
     --postgres-conf slot.name=test \
     --postgres-conf decoding.plugin.name=pgoutput \
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=123456 \
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1 \
     --multi-to-one-origin "order_.*|customer_.*" \
     --multi-to-one-target "ods_order|ods_customer"

同步结果如下所示:

3. Schema Change

当数据源如 MySQL 或 Oracle 发生表结构更改时,connector 支持同步以下三种数据定义语言(DDL)变更到 Doris:增加列、删除列和更改表名。同步这些变更的主要原理是通过解析数据源发生结构更改时生成的 JSON 日志,特别是解析其中的 DDL 语句,以此触发 Doris 对应的 schema change任务。

然而,对于 SQL Server 和 Db2 的情况,当表结构发生更改时,Debezium 生成的 JSON 日志中的 DDL 属性通常为 null。这导致 connector 无法通过解析 DDL 捕获数据源的表结构变化,进而无法在 Doris 中触发 schema 更改任务。有关这个问题的详细文档,请参考以下链接:

  • Debezium connector for Db2 :: Debezium Documentation
    • https://debezium.io/documentation/reference/connectors/db2.html
  • Debezium connector for SQL Server :: Debezium Documentation
    • https://debezium.io/documentation/reference/connectors/sqlserver.html

① 如下在MySQL 中有一张all_types 的表,结构如下:

代码语言:javascript
代码运行次数:0
复制
create table all_types
(
    tinyint_u   tinyint unsigned      not null primary key,
    smallint_u  smallint unsigned                     default 1                          null,
    mediumint_u mediumint unsigned                    default 1                          null,
    int_test2   int                                                                      null,
    bigint_u    bigint unsigned                                                          null,
    decimal_u   decimal(18, 5) unsigned               default 123.66667                  null,
    double_u    double unsigned                       default '12.1'                     null,
    float_u     float unsigned                        default '1.11'                     null,
    boolean     tinyint(1)                            default 0                          null,
    `tinyint`   tinyint                                                                  null,
    `smallint`  smallint                                                                 null,
    year        year                                  default '1970'                     null,
    `mediumint` mediumint                             default 12                         null,
    `int`       int                                   default 100                        null,
    `bigint`    bigint                                default 11000000                   null,
    date        date                                  default '2024-01-01'               null,
    timestamp   timestamp(4)                          default '2024-01-01 01:01:01.1111' null,
    datetime    datetime                              default '2011-01-01 01:01:01'      null,
    `float`     float                                 default 1.233                      null,
    `double`    double                                default 1.2344545                  null,
    `decimal`   decimal(12, 4)                                                           null,
    `char`      char(5)                               default 'doris'                    null,
    `varchar`   varchar(10)                                                              null,
    time        time(4)                               default '00:00:00.0000'            null,
    text        text                                                                     null,
    `blob`      blob                                                                     null,
    json        json                                                                     null,
    `set`       set ('Option1', 'Option2', 'Option3') default 'Option1'                  null,
    bit         bit(6)                                default b'110001'                  null,
    `binary`    binary(12)                            default doris               null,
    `varbinary` varbinary(12)                         default doris                      null,
    enum        enum ('Value1', 'Value2', 'Value3')                                      null,
    ts_0        timestamp                             default CURRENT_TIMESTAMP          null,
    ts_1        timestamp                             default CURRENT_TIMESTAMP          null,
    ts_2        timestamp(6)                          default CURRENT_TIMESTAMP(6)       null,
    t_text      varchar(10)                           default 'doris'''                  null
);

② 通过在增加use-new-schema-change 选项,可以通过如下shell脚本同步数据源的表结构更改(1.6.0默认打开):

代码语言:javascript
代码运行次数:0
复制
bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.6.0.jar \
    mysql-sync-database \
    --database doris_test \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf port=23306 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=doris_test \
    --including-tables "all_types" \
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label \
    --table-conf replication_num=1 \

③ 例如在MySQL 中更改all_types表中的t_text 列的名字

MySQL 5.7.x 的执行语法如下(1.6.0 兼容了MySQL 5.7.x change 语法):

代码语言:javascript
代码运行次数:0
复制
alter table all_types change t_text t_text_new_name varchar(10);(since 1.6.0)
或者
alter table all_types change column t_text t_text_name varchar(10);

MySQL 8.0.x 更改列名语法如下:

代码语言:javascript
代码运行次数:0
复制
alter table all_types rename column t_text to t_text_new_name; (since 1.5.0)

rename之前Doris表结构:

rename 之后Doris表结构:

4. 动态加表

在1.5.0 之前的版本中,对于数据源新增的表需要另外起任务,如果新增表较多显然是比较麻烦的。通过在shell 中配置--single-sink ,可以自动识别上游自动创建的表。

例如在MySQL testdb中增加一张test_datetime_new_add表,shell脚本如下,只需要增加`single-sink`即可。不过需要注意,新增加的表需要在including-tables内,excluding-tables外,否则会被拦截。

当前由于connector 依赖的flink-cdc 为2.4.2, ,Oracle,Sql Server,Postgres新增表的特性,可能需要等待Flink CDC 3.1 详情见:

https://github.com/apache/flink-cdc/pull/2385/files

https://github.com/apache/flink-cdc/pull/3024

代码语言:javascript
代码运行次数:0
复制
bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.6.0.jar \
    mysql-sync-database \
    --database testdb \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf port=23306 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=testdb \
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label \
    --table-conf replication_num=1 \
    --use-new-schema-change true \
    --single-sink true

当在数据源新创建表后,可以在flink taskmanager日志中查看到相对应的ddl日志信息以及新增表创建状态信息。

5. Flink Connector Doris 自动建表

如果有的用户希望单纯只使用connector来进行关系型数据到Doris ddl的转换而不进行数据的导入,那么用户可以在shell脚本中增加如下参数:

代码语言:javascript
代码运行次数:0
复制
--create-table-only true 

Flink-connector整库同步自动建表能够满足大多数人的要求,对含有主键表一般会自动映射成unique key ,然而对于无主键表,一般会使用每个表的第一个列当作duplicat key。目前对于数据源的唯一索引暂时还不支持映射为unique key。如果有的用户对于connector自动建的表不满意,可以在开始时设置`--create-table-only` true,等表结构全部映射过去之后,对于需要调整的表结构,可以进行相对应的调整。例如:当你将带有唯一索引的表改为uinque key模型后,重新启动flink整库同步任务之后,会首先在校验数据源映射过去的表是否在Doris存在,如果存在,则不进行自动建表。

代码语言:javascript
代码运行次数:0
复制
彩蛋:RDMS DDL TO Doris
方案一:Flink整库同步
支持整库级别的自动建表,操作简单
方案二:JDBC catalog 
create table table_name like external_catalog.db.table;
方案三:SQL Convertor
https://play.selectdb.com/sql-convertor

五、常见FAQ

1. java.lang.NoSuchFiledError:

Scan_INCREMENTAL_CLOSE_IDLE_READER_ENABLE

Flink-connector-doris支持整库同步是基于Flink CDC 2.4.x,所以使用整库同步时请在lib中引用flink-sql-cdc-2.4.x。

2. java.lang.NoClassDefFoundError:

com/ververica/cdc/common/utils/StringUtils

出现该错误也是因为Flink CDC3.0.1导致的,如:

https://github.com/apache/flink-cdc/pull/3036

3. 整库同步过程中出现Failed to execute sql:

org.apache.doris.common.AnalysisException: errCode = 2, detailMessage = Syntax error in line "xxx".

这种问题通常是由于源数据表的 DDL 不符合 Doris 的规范,导致建表失败。解决这个问题的方法有两种:

排除不兼容表:您可以选择将不兼容的表排除在外,使用 --excluding-tables 参数进行设置。

修改数据源表的结构:您可以修改源数据表的结构,以符合 Doris 的规范。主要涉及到表名、列名甚至默认值的修改。以下是 Doris 表名和列名的正则表达式:

  • Doris 表名正则表达式:^[a-zA-Z][a-zA-Z0-9-_]*$
  • Doris 列名正则表达式:^[_a-zA-Z@0-9\\s<>/][.a-zA-Z0-9_+-/><?@#

如果您遇到的是默认值不兼容的问题,可以参考以下解决方案:

  • 修改数据源表的结构,去掉不兼容的默认值。
  • 升级到1.6.0版本:该版本将忽略同步数据源默认设置,因为对于 OLAP 而言,同步默认值没有意义。

此外,connector 在后续版本中将增加一个忽略不兼容 DDL 的选项,从而自动忽略不符合 Doris 规范的表格。

4. 使用整库同步 MySQL 数据到 Doris,出现 timestamp 类型与源数据相差多个小时。

由于历史原因和版本兼容性问题,整库同步过程中默认配置了时区设置为 `timezone="UTC+8"`。如果您需要同步的数据来源于不同的时区,您可以通过调整配置来匹配正确的时区。例如,如果您的数据时区为 UTC+3,可以通过以下设置来调整:

代码语言:javascript
代码运行次数:0
复制
--mysql-conf debezium.date.format.timestamp.zone="UTC+3"

这样的设置确保数据同步过程中时间戳正确地反映了数据的原始时区,从而避免因时区差异导致的时间错误。

5. Oracle 数据捕获延时高

开启这个参数后,可以减少数据同步的延迟和减少 redo 日志的存储,但是这样无法处理 DDL 语句。生产环境默认策略读取 log 较慢,且默认策略会写入数据字典信息到 redo log 中导致日志量增加较多,可以添加如下 debezium 的配置项,该参数不适合Oracle19。

https://developer.aliyun.com/article/834483

代码语言:javascript
代码运行次数:0
复制
--oracle-conf "debezium.log.mining.strategy"="catalog"
--oracle-conf "debezium.log.mining.continuous.mine"="true"

6. 同步 Oracle Clob/Blob 类型到 Doris 为 null

可以增加如下配置,该参数默认是false,如果设置为ture,可能会影响同步性能。

代码语言:javascript
代码运行次数:0
复制
--oracle-conf "debezium.lob.enabled"="true"
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-04-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一臻数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档