介绍
MySQL 的 CDC 源表,支持对 MySQL 数据库的全量和增量读取,并保证 Exactly Once 语义。MySQL CDC 底层使用了 Debezium 来实现对变更数据流的实时抓取(Change Data Capture)。
MySQL CDC 1.x 工作机制
1. 获取一个全局读锁,从而阻塞住其他数据库客户端的写操作。
2. 开启一个可重复读语义的事务,来保证后续在同一个事务内读操作都是在一个一致性快照中完成的。
3. 读取 Binlog 的当前位置。
4. 读取连接器中配置的数据库和表的模式(schema)信息。
5. 释放全局读锁,允许其他的数据库客户端对数据库进行写操作。
6. 扫描全表,当全表数据读取完后,会从第3步中得到的 Binlog 位置获取增量的变更记录。
Flink 作业运行期间会周期性执行快照,记录下 Binlog 位置,当作业崩溃恢复时,便会从之前记录的 Binlog 点继续处理,从而保证 Exactly once 语义。
MySQL CDC 2.x 工作机制
1. MySQL 表需要有主键,如果是联合主键则会选择数据表中的第一个主键作为全量阶段的 splitKey,其用来将数据分为多个分片(Chunk)。
2. 全量阶段使用无锁算法,无需给表加锁。
3. 整个同步过程分为两个阶段,全量阶段并发读取分片数据,全量阶段结束之后进入增量阶段,整个过程都支持 Checkpoint 从而保证 Exactly once 语义。
版本说明
Flink 版本 | 说明 |
1.11 | 支持 MySQL 版本为 5.6 |
1.13 | 支持 MySQL 版本为 5.6, 5.7, 8.x 默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false' |
1.14 | 支持 MySQL 版本为 5.6, 5.7, 8.x 默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false' |
1.16 | 支持 MySQL 版本为 5.6, 5.7, 8.x 默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置
'scan.incremental.snapshot.enabled' = 'false' |
使用范围
MySQL CDC 只支持作为源表。
DDL 定义
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '192.168.10.22', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'hello@world!', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);
WITH 参数
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值为 mysql-cdc |
hostname | MySQL 数据库的 IP 地址或者 Hostname | 是 | - |
port | MySQL 数据库服务的端口号 | 否 | 默认值为3306 |
username | MySQL 数据库服务的用户名 | 是 | 有特定权限(包括 SELECT、RELOAD、SHOW DATABASES、REPLICATION SLAVE 和 REPLICATION CLIENT)的 MySQL 用户 |
password | MySQL 数据库服务的密码 | 是 | - |
database-name | MySQL 数据库名称 | 是 | 数据库名称支持正则表达式以读取多个数据库的数据 |
table-name | MySQL 表名 | 是 | 表名支持正则表达式以读取多个表的数据 |
server-id | 数据库客户端的一个 ID | 否 | 该 ID 必须是 MySQL 集群中全局唯一的。建议针对同一个数据库的每个作业都设置不同的 ID 范围值,例如 5400-5405 。默认会随机生成一个6400 - Integer.MAX_VALUE 的值 |
server-time-zone | 数据库在使用的会话时区 | 否 | 例如 Asia/Shanghai,该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型 |
append-mode | 开启 append 流模式 | 否 | Flink1.13及以上版本支持, 例如:将 mysql-cdc 数据以 append 的方式同步到 hive |
filter-duplicate-pair-records | 过滤未在 Flink DDL 语句中定义的源表字段变更记录 | 否 | 例如 MySQL 源表有 a, b, c, d 四个字段,而用户在 Flink SQL 建表时只定义了 a, b 两个字段;开启该参数后,仅涉及 c 或 d 字段的变更记录会被忽略,不会输出到下游,可减少计算量和处理压力 |
scan.lastchunk.optimize.enable | 对全量阶段的最后一个分片做重划分 | 否 | 如果全量同步期间,源表持续有大量写入和变更,则可能导致最后一个分片过大,引起 TaskManager OOM 崩溃重启。
开启本功能后(值设置为 true),Flink 会自动将过大的最后一个分片分成若干的小分片,提升作业的稳定性 |
debezium.min.row.count.to.stream.results | 当表的条数大于该值时,会使用分批读取模式 | 否 | 默认值为1000。Flink 采用以下方式读取 MySQL 源表数据: 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有 OOM 风险 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有 OOM 风险,缺点是读取速度相对较慢 |
debezium.snapshot.fetch.size | 在 Snapshot 阶段,每次读取 MySQL 源表数据行数的最大值 | 否 | 仅当分批读取模式时,该参数生效 |
debezium.skipped.operations | 需要过滤的 oplog 操作。操作包括 c 表示插入,u 表示更新,d 表示删除。默认情况下,不跳过任何操作,以逗号分隔 | 否 | - |
scan.incremental.snapshot.enabled | 增量快照 | 否 | 默认为 true |
scan.incremental.snapshot.chunk.size | 当读取表的快照时,表快照捕获的表的块大小(行数) | 否 | 默认为 8096 |
scan.lazy-calculate-splits.enabled | 全量阶段JM中数据分片懒加载避免数据量太大,分片数据太多导致JM OOM | 否 | 默认为 true |
scan.newly-added-table.enabled | 动态加表 | 否 | 默认为 false |
scan.split-key.mode | 联合主键作为 splitkey 的模式 | 否 | 取值为 default / specific;其中 default 为默认逻辑,采用联合主键的第一个字段作为 split key;设置为 specific 需要设置 scan.split-key.specific-column 指定联合主键中的某个字段 |
scan.split-key.specific-column | 指定联合主键中某个字段作为 splitkey | 否 | 当 scan.split-key.mode 为 specific 时必填。取值为联合主键中某个字段名 |
scan.startup.mode | MySQL CDC 消费者可选的启动模式 | 否 | 合法的模式为 "initial"(默认),"earliest-offset","latest-offset","specific-offset" 和 "timestamp" |
scan.startup.specific-offset.file | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置 | 否 | - |
scan.startup.specific-offset.pos | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置 | 否 | - |
scan.startup.specific-offset.gtid-set | 在 "specific-offset" 启动模式下,启动位点的 GTID 集合 | 否 | - |
scan.startup.timestamp-millis | 在 "timestamp" 启动模式下,启动时间的毫秒时间戳 | 否 | - |
scan.startup.specific-offset.skip-events | 在指定的启动位点后需要跳过的事件数量 | 否 | - |
scan.startup.specific-offset.skip-rows | 在指定的启动位点后需要跳过的数据行数量 | 否 | - |
connect.timeout | 尝试连接到 MySQL 数据库服务器后在超时之前等待的最长时间 | 否 | 默认 30s |
connect.max-retries | 建立MySQL连接尝试最大的次数 | 否 | 默认 3 |
connection.pool.size | 连接池大小 | 否 | 默认 20 |
jdbc.properties.* | 自定义JDBC URL参数,例如: 'jdbc.properties.useSSL' = 'false' | 否 | 默认 20 |
heartbeat.interval | 发送心跳事件的时间间隔,用于跟踪最新可用的binlog偏移量, 一般用于解决慢表的问题(更新缓慢的数据表) | 否 | 默认 20 |
debezium.* | Debezium 属性参数 | 否 |
可用元数据(Flink1.13 及以上版本可使用)
支持的元数据列:
列 | 数据类型 | 描述 |
database_name/meta.database_name | STRING NOT NULL | 包含该 Row 的数据库名称 |
table_name/meta.table_name | STRING NOT NULL | 包含该 Row 的表名称 |
op_ts/meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL | Row 在数据库中进行更改的时间 |
meta.batch_id | BIGINT | binlog 的批 id |
meta.is_ddl | BOOLEAN | 是否 DDL 语句 |
meta.mysql_type | MAP | 数据表结构 |
meta.update_before | ARRAY | 未修改前字段的值 |
meta.pk_names | ARRAY | 主键字段名 |
meta.sql | STRING | 暂时为空 |
meta.sql_type | MAP | sql_type 表的字段到 Java 数据类型 ID 的映射 |
meta.ts | TIMESTAMP_LTZ(3) NOT NULL | 收到该 ROW 并处理的当前时间 |
meta.op_type | STRING | 数据库操作类型,例如 INSERT/DELETE 等 |
meta.file | STRING | 全量阶段时为空。增量阶段时为数据来自的 binlog 文件名,例如 mysql-bin.000101 |
meta.pos | BIGINT | 全量阶段时为0。增量阶段时为数据来自的 binlog 文件偏移,例如 143127802 |
meta.gtid | STRING | 全量阶段时为 null。增量阶段时为数据对应的 gtid 值,例如 3d3c4464-c320-11e9-8b3a-6c92bf62891a:66486240 |
使用示例
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,`database_name` string METADATA FROM 'database_name',`table_name` string METADATA FROM 'table_name',`op_ts` timestamp(3) METADATA FROM 'op_ts',`op_type` string METADATA FROM 'meta.op_type',`batch_id` bigint METADATA FROM 'meta.batch_id',`is_ddl` boolean METADATA FROM 'meta.is_ddl',`update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',`mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',`pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',`sql` STRING METADATA FROM 'meta.sql',`sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',`ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '192.168.10.22', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'hello@world!', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);
MySQL 分库分表读取方式
目前 Oceanus 已支持 MySQL 分库分表的读取。
如果 MySQL 是一个分库分表的数据库,分成了 A_1、 A_2、A_3 ... 等多个表,且所有表的 schema 一致,则可以通过 table-name 选项,指定一个正则表达式来匹配读取多张表,例如设置 table-name 为 A_.* ,监控所有 A_ 前缀的表。database-name 选项也支持该功能。
说明
如果 database-name 和 table-name 设置为正则匹配的话,需要使用
()
把正则式包围起来。 类型映射
MySQL 的 CDC 和 Flink 字段类型对应关系如下:
MySQL type | Flink SQL type | NOTE |
TINYINT | TINYINT | - |
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL | SMALLINT | - |
INT MEDIUMINT SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL | INT | - |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL | BIGINT | - |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL | DECIMAL(20, 0) | - |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL | FLOAT | - |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL | DOUBLE | - |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 | DECIMAL(p, s) | - |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 | STRING | MySQL 中 DECIMAL 数据类型的精度最高为 65,而 Flink 中 DECIMAL 的精度限制为 38。 所以如果您定义一个精度大于38的十进制列,您应该把它映射到STRING,以避免精度损失 |
BOOLEAN TINYINT(1) BIT(1) | BOOLEAN | - |
DATE | DATE | - |
TIME [(p)] | TIME [(p)] | - |
TIMESTAMP [(p)] DATETIME [(p)] | TIMESTAMP [(p)] | - |
CHAR(n) | CHAR(n) | - |
VARCHAR(n) | VARCHAR(n) | - |
BIT(n) | BINARY(⌈n/8⌉) | - |
BINARY(n) | BINARY(n) | - |
VARBINARY(N) | VARBINARY(N) | - |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT | STRING | - |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB | BYTES | 对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2,147,483,647(2 ** 31 - 1) 的 blob |
YEAR | INT | - |
ENUM | STRING | - |
JSON | STRING | JSON 数据类型会在 Flink 中转换为 JSON 格式的 STRING |
SET | ARRAY<STRING> | 由于 MySQL 中的 SET 数据类型是一个可以有零个或多个值的字符串对象,所以它应该总是映射到一个字符串数组 |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION | STRING | MySQL 中的空间数据类型会被转换成固定 Json 格式的 STRING |
代码示例
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '192.168.10.22', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'hello@world!', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);CREATE TABLE `print_table` (`id` INT,`name` STRING) WITH ('connector' = 'print');insert into print_table select * from mysql_cdc_source_table;
注意事项
Checkpoint 相关
使用 CDC 1.0 ('scan.incremental.snapshot.enabled' = 'false') 时,需要做的额外参数配置。
由于 CDC 1.0 读取全量数据阶段无法做 checkpoint,当需要同步的表较多、数据较大时,可能会导致多次快照失败,从而引发作业失败。可以通过作业高级参数
execution.checkpointing.tolerable-failed-checkpoints: 100
调整 checkpoint 失败的容忍次数。使用 CDC 2.0,且作业的默认并行度大于 1 时,必须开启 checkpoint。
CDC 读取完全量数据后,需要等待一个 checkpoint 完成后才能进入增量阶段。
关于使用 CDC 1.0 的风险告知
当表没有主键时,只能通过使用 WITH 参数
'scan.incremental.snapshot.enabled' = 'false'
开启 CDC 1.0 模式,会存在以下风险:1. 默认会使用 FTWRL (flush table with read lock)。
2. 虽然 FTWRL 只会持有短暂的时间,但由于 FTWRL 的机制,可能会导致锁库。
3. FTWRL 可能会出现的情况如下:
会等待正在执行的 update/select 操作完成。
在等待 update/select 完成的期间,会导致数据库不可用,即阻塞新加入的 SELECT 查询,这是 MySQL Query Cache 机制导致的。
如果同时启动多个不同的 MySQL CDC 1.0 的 source,大概率会碰到上述情况。
用户权限
用于同步的源数据库的用户必须拥有以下权限 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD。
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';FLUSH PRIVILEGES;
全局读锁
上述的工作原理中,MySQL CDC 1.x 中可以看到第一步就会获取一个全局读锁,用于获取 schema 和 Binlog 位置。这里会阻塞其他客户端的写入,因此仍可能对线上业务造成影响。 若可以接受 At Least Once 语义,可通过设置 WITH 参数
'debezium.snapshot.locking.mode' = 'none'
跳过这个阶段。联合主键设置
例如,下面的 DDL 设置了
index1
、index2
、index3
、index4
4个字段为联合主键索引,要和 PRIMARY KEY
定义保持一致,顺序不会影响正常的同步。CREATE TABLE db_order_dim (`index1` STRING,`index2` STRING,`index3` STRING,`index4` STRING,`field5` STRING,`field6` STRING,PRIMARY KEY(`index1`, `index2`, `index3`, `index4`) NOT enforced) WITH (...);
server-id 定义
不建议显式指定
server-id
,因为 Oceanus 平台会为每个表自动生成随机 server-id
值(范围是 6400 - 2147483647
),以避免不同作业读取同一个库可能出现的 server-id
冲突问题。如果必须要手动指定
server-id
值,对于 CDC 2.x 版本,建议设置为范围值,例如 5400-5405
,因为每个并行读取器应该有一个唯一的服务器 ID,所以 server-id
必须是 5400-5405
这样的范围,且范围必须大于并行度。但对于 CDC 1.x 版本,只能设置单个 server-id
值,不支持范围设定。指定
server-id
有以下两种方式:1.
mysql-cdc
DDL 的 WITH 参数中指定。2. 使用 SQL Hints 来指定
server-id
。SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
设置 MySQL 会话超时
全量阶段读取大型数据库的时候可能会超时,您可以对 MySQL 做一些配置来避免这个问题。
interactive_timeout
服务器在关闭交互式连接之前等待其活动的秒数。请参见 MySQL 文档。wait_timeout
服务器在关闭非交互式连接之前等待其活动的秒数。请参见 MySQL 文档。增量阶段也有可能因为 TaskManager 负载过高导致心跳失效,服务端主动断开连接(EOFException)。可以在 MySQL 服务端执行下面的 SQL 语句,调大超时时间:
SET GLOBAL slave_net_timeout = 120;SET GLOBAL thread_pool_idle_timeout = 120;
JobManager 关键日志说明
在 CDC 2.x 模式下,每个表的同步都要经历分片划分、全量快照同步、增量修正、纯增量同步等阶段。由于前三个阶段资源占用大、耗时久,Oceanus 在日志和指标方面做了加强,协助用户洞察和分析作业的运行情况。
1. 分片划分与分配
开始划分分片:搜索
into chunks
关键字,例如 Start splitting table cdc_basic_source.random_source_1 into chunks
或者 Start lazily splitting table cdc_basic_source.random_source_1 into chunks
结束划分分片: 搜索
chunks, time cost
关键字,例如 Split table cdc_basic_source.random_source_1 into 14 chunks, time cost: 994ms.
2. 全量快照同步
查看全量分片分配进度:开启
DEBUG
级别日志,搜索 Current assigned splits for
关键字,即可查看每个表的总分片数和分配进度。结束全量分片分配:搜索
finished. Total split number
关键字,例如 Split assignment for cdc_basic_source.random_source_1 finished. Total split number: 14
结束全量阶段:搜索
Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED
日志。3. 增量修正
开始增量修正阶段:搜索
Initial assigning finished as there are no more splits. Creating binlog split
或者 Newly added assigning finished as there are no more splits. Waking up binlog reader
日志。4. 纯增量同步
参见下一节 “TaskManager 关键日志说明”。
TaskManager 关键日志说明
进入纯增量同步阶段:搜索
has entered pure binlog phase
日志。例如 Table cdc_basic_source.random_source_2 has entered pure binlog phase.
表的 Schema 变更:搜索
Received schema change event
日志。EOFException 异常:如果作业重启,且提示异常是
EOFException
,则可根据提示语,调整 MySQL 服务端的超时参数。同时也可以减少总并行度,升级每个 TaskManager 的规格,以减少内存和 CPU 压力过高导致的超时问题的发生概率。监控指标说明
logpos:获取当前消费到的 Binlog 位点,可以协助定位消费卡顿等问题。
numberOfInsertRecords:获取输出的 +I 消息数。
numberOfDeleteRecords:获取输出的 -D 消息数。
numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
numberOfUpdateAfterRecords:获取输出的 +U 消息数。