本文档将手把手的指引您完成 MySQL 实时同步至 Iceberg 的流程操作,包括整库迁移和单表同步。大体的流程如下:
一、数据源兼容情况。
二、MySQL 环境准备。
三、Iceberg 环境准备。
四、项目配置及资源组准备。
五、实时整库:MySQL 同步至 Iceberg 配置步骤。
六、实时单表:MySQL 同步至 Iceberg 配置步骤。
七、实时节点高级参数。
八、常见问题。
一、数据源兼容情况
MySQL 数据源
如果您想使用 MySQL 进行实时数据同步操作,需要先确认 MySQL 数据源版本支持情况、使用限制及支持的数据类型转换。
支持版本
目前数据集成已支持 MySQL 单表及整库级实时读取,使用实时读取能力需遵循以下版本限制:
类型 | 版本 | Driver |
MySQL | 5.6,5.7,8.0.x | JDBC Driver:8.0.21 |
RDS MySQL | 5.6,5.7, 8.0.x | |
PolarDB MySQL | 5.6,5.7,8.0.x | |
Aurora MySQL | 5.6,5.7,8.0.x | |
MariaDB | 10.x | |
PolarDB X | 2.0.1 | |
使用限制
需要开启 Binlog 日志,仅支持同步 MySQL 服务器 Binlog 配置格式为 ROW。
无主键的表由于无法保证 exactly once 可能会有数据重复,因此实时同步任务要保证有主键。
不支持 XA ROLLBACK,实时同步的任务不会针对 XA PREPARE 的数据进行回滚的操作,若要处理 XA ROLLBACK 场景,需要手动将 XA ROLLBACK 的表从实时同步任务中移除,再添加表后重新进行同步。
设置 MySQL 会话超时:
当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置 interactive_timeout 和 wait_timeout 来防止这种行为。
interactive_timeout:服务器在关闭交互式连接之前等待其活动的秒数。请参阅 MySQL :: MySQL 8.0 Reference Manual :: 7.1.8 Server System Variables。
wait_timeout:服务器在关闭非交互式连接之前等待其活动的秒数。请参阅 MySQL :: MySQL 8.0 Reference Manual :: 7.1.8 Server System Variables。
MySQL 读取数据类型转换
MySQL 读取支持的数据类型以及内部映射字段如下所示(在处理 MySQL 时,会先将 MySQL 数据源的数据类型和数据处理引擎的数据类型做映射):
字段类型 | 是否支持 | 内部映射字段 | 备注 |
TINYINT | 是 | TINYINT | TINYINT(1) 映射到 BOOLEAN 需要增加选项支持 TINYINT(1) 可以映射到 bool 或者 tinyint |
SMALLINT | 是 | SMALLINT | - |
TINYINT_UNSIGNED | 是 | SMALLINT | - |
TINYINT_UNSIGNED_ZEROFILL | 是 | SMALLINT | - |
INT | 是 | INT | - |
INTEGER | 是 | INT | - |
YEAR | 是 | INT | - |
MEDIUMINT | 是 | INT | - |
SMALLINT_UNSIGNED | 是 | INT | - |
SMALLINT_UNSIGNED_ZEROFILL | 是 | INT | - |
BIGINT | 是 | LONG | - |
INT_UNSIGNED | 是 | LONG | - |
MEDIUMINT_UNSIGNED | 是 | LONG | - |
MEDIUMINT_UNSIGNED_ZEROFILL | 是 | LONG | - |
INT_UNSIGNED_ZEROFILL | 是 | LONG | - |
BIGINT_UNSIGNED | 是 | DECIMAL | DECIMAL(20,0) |
BIGINT_UNSIGNED_ZEROFILL | 是 | DECIMAL | DECIMAL(20,0) |
SERIAL | 是 | DECIMAL | DECIMAL(20,0) |
FLOAT | 是 | FLOAT | - |
FLOAT_UNSIGNED | 是 | FLOAT | - |
FLOAT_UNSIGNED_ZEROFILL | 是 | FLOAT | - |
DOUBLE | 是 | DOUBLE | - |
DOUBLE_UNSIGNED | 是 | DOUBLE | - |
DOUBLE_UNSIGNED_ZEROFILL | 是 | DOUBLE | - |
DOUBLE_PRECISION | 是 | DOUBLE | - |
DOUBLE_PRECISION_UNSIGNED | 是 | DOUBLE | - |
ZEROFILL | 是 | DOUBLE | - |
REAL | 是 | DOUBLE | - |
REAL_UNSIGNED | 是 | DOUBLE | - |
REAL_UNSIGNED_ZEROFILL | 是 | DOUBLE | - |
NUMERIC | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
NUMERIC_UNSIGNED | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
NUMERIC_UNSIGNED_ZEROFILL | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
DECIMAL | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
DECIMAL_UNSIGNED | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
DECIMAL_UNSIGNED_ZEROFILL | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
FIXED | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
FIXED_UNSIGNED | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
FIXED_UNSIGNED_ZEROFILL | 是 | DECIMAL | 采用用户数据库实际的精度 p<=38 映射到 DECIMAL 38 < p <= 65 时映射到 String |
BOOLEAN | 是 | BOOLEAN | - |
DATE | 是 | DATE | - |
TIME | 是 | TIME | - |
DATETIME | 是 | TIMESTAMP | - |
TIMESTAMP | 是 | TIMESTAMP | - |
CHAR | 是 | STRING | - |
JSON | 是 | STRING | - |
BIT | 是 | STRING | BIT(1) 映射到 BOOLEAN |
VARCHAR | 是 | STRING | - |
TEXT | 是 | STRING | - |
BLOB | 是 | STRING | - |
TINYBLOB | 是 | STRING | - |
TINYTEXT | 是 | STRING | - |
MEDIUMBLOB | 是 | STRING | - |
MEDIUMTEXT | 是 | STRING | - |
LONGBLOB | 是 | STRING | - |
LONGTEXT | 是 | STRING | - |
VARBINARY | 是 | STRING | - |
GEOMETRY | 是 | STRING | - |
POINT | 是 | STRING | - |
LINESTRING | 是 | STRING | - |
POLYGON | 是 | STRING | - |
MULTIPOINT | 是 | STRING | - |
MULTILINESTRING | 是 | STRING | - |
MULTIPOLYGON | 是 | STRING | - |
GEOMETRYCOLLECTION | 是 | STRING | - |
ENUM | 是 | STRING | - |
BINARY | 是 | BINARY | BINARY(1) |
SET | 否 | - |
Iceberg 数据源
如果您想进行实时数据同步至 Iceberg 操作,需要先确认 Iceberg 数据源版本支持情况、使用限制及支持的数据类型转换。
支持版本
Iceberg 支持 0.11 及以上版本。
使用限制
Iceberg 写入后,为提高查询性能,一般要求下游通过定时 Spark Action 进行小文件合并,并合理配置 Checkpoint 周期。
Iceberg 写入数据类型转换
Iceberg 写入支持的数据类型及内部映射字段类型如下所示(在处理 Iceberg 的时候,会将 Iceberg 数据源的数据类型和数据处理引擎的数据类型做映射):
内部类型 | Iceberg 类型 |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
BOOLEAN | BOOLEAN |
BINARY | FIXED(L) |
VARBINARY | BINARY |
DECIMAL | DECIMAL(P,S) |
TINYINT | INT |
SMALLINT | INT |
INTEGER | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMP_LTZ | TIMESTAMPTZ |
INTERVAL | - |
ARRAY | LIST |
MULTISET | MAP |
MAP | MAP |
ROW | STRUCT |
RAW | - |
二、MySQL 环境准备
确认 MySQL 版本
数据集成对 MySQL 版本有要求,查看当前待同步的 MySQL 是否符合版本要求。您可以在 MySQL 数据库通过如下语句查看当前 MySQL 数据库版本。
select version();
设置 MySQL 服务器权限
您必须定义一个对 Debezium MySQL 连接器监控的所有数据库具有适当权限的 MySQL 用户。
1. 创建 MySQL 用户(可选)
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
2. 向用户授予所需的权限:
在实时数据同步的情况下,该账号必须拥有数据库的 SELECT、REPLICATION SLAVE 和 REPLICATION CLIENT 权限。执行命令可以参考下面:
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
说明:
启用 scan.incremental.snapshot.enabled 时不再需要 RELOAD 权限(默认启用)。
3. 刷新用户的权限:
mysql>FLUSH PRIVILEGES;
开启 MySQL Binlog
1. 检查 binlog 是否开启
show variables like "log_bin"
返回结果为 ON 时,表示已经开启 Binlog, 如果为备库,使用如下语句:
show variables like "log_slave_updates";
如果返回为 ON 时,表示已经开启 Binlog,如果已经开启 Binlog,可跳过下面流程。
2. 开启 Binlog
如果确认没有开启 Binlog,则需要进行以下操作。
对于腾讯云实例 MySQL / TDSQL-C MySQL,默认开启了 binlog。
对于开源 MySQL,参考官方文档开启 binlog。
3. 修改 Binlog 格式为 Row
实时同步仅支持同步 MySQL 服务器 Binlog 配置格式为 ROW,使用如下语句查询 Binlog 的使用格式。
show variables like "binlog_format";
如果返回非 ROW 请修改 Binlog Format。
对于开源 MySQL,参考官方文档:MySQL :: MySQL 8.0 Reference Manual :: 19.1.6.4 Binary Logging Options and Variables。
对于腾讯云实例 MySQL / TDSQL-C MySQL:
登录腾讯云 MySQL / TDSQL-C MySQL 控制台,找到要开启 Binlog 的实例,单击进入该实例的详细信息页面。
在上面选项卡中选择数据库管理,找到参数设置选项卡。
在参数设置选项卡中,找到 binlog_format 参数,将其设置为 “ROW”。
4. binlog_row_image
实时同步仅支持同步 MySQL 服务器 binlog_row_image 配置格式为 FULL or full。
使用如下语句查询 binlog_row_image 的使用格式。
show variables like "binlog_row_image";
如果返回非 FULL/full 请修改 binlog_row_image:
对于开源 MySQL 参考官方文档:MySQL :: MySQL 8.0 Reference Manual :: 19.1.6.4 Binary Logging Options and Variables。
对于腾讯云实例 MySQL / TDSQL-C MySQL:
登录腾讯云 MySQL / TDSQL-C MySQL 控制台,找到要开启 Binlog 的实例,单击进入该实例的详细信息页面。
在上面选项卡中选择数据库管理,找到参数设置选项卡。
在参数设置选项卡中,找到 binlog_row_image 参数,将其设置为“FULL”。
开启 GTIDs(可选)
GTID(Global Transaction Identifier, 全局事务标识),用于在 binlog 中唯一标识一个事务,使用 GTID 可以避免事务重复执行导致数据混乱或者主从不一致。
1. 检查是否开启了 GTID
show global variables like '%GTID%';
返回结果类似如下,证明已经开启 GTID。
+--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | enforce_gtid_consistency | ON | | gtid_mode | ON | +--------------------------+-------+
2. 开启 GTID
对于开源 MySQL,参考官方文档 MySQL :: MySQL 8.0 Reference Manual :: 19.1.4.2 Enabling GTID Transactions Online。
对于腾讯云实例 MySQL / TDSQL-C MySQL,默认为开启,不支持关闭。
添加腾讯云 MySQL 数据库安全组
安全组是一种有状态的包含过滤功能的虚拟防火墙,用于设置单台或多台云数据库的网络访问控制,是腾讯云提供的重要的网络安全隔离手段。如果您使用的腾讯云 MySQL 数据库高于基础版,您需要将下列访问 IP 加到目标数据库的安全组中。如果您不配置安全组,在 WeData 中配置 MySQL 数据源的时候可能会出现连通性测试失败的情况。具体操作可参见 管理云数据库安全组。
118.89.220.0/24, 139.199.116.0/24, 140.143.68.0/24, 152.136.131.0/24, 81.70.150.0/24, 81.70.161.0/24, 81.70.195.0/24, 81.70.198.0/24, 82.156.22.0/24, 82.156.221.0/24, 82.156.23.0/24, 82.156.24.0/24, 82.156.27.0/24, 82.156.82.0/24, 82.156.84.0/24, 82.157.119.0/24
三、Iceberg 环境准备
实时集成可以根据上游表结构自动创建目标表。如果不想使用该功能,也可以在作业运行前在 Iceberg 中创建对应结构的目标表。
四、WeData 前置准备
在正式配置同步任务之前,需要创建一个项目。如下基于一个全新的环境讲解相关步骤,如果您已经创建了项目成员或完成了项目创建可以跳过。
项目创建
如果您还没有在数据开发治理平台 WeData 创建过项目空间,在进行数据同步任务之前需完成项目创建。
创建项目空间,需要进入 WeData 首页,单击项目列表 > 创建项目。 如果只是做数据集成,此处可以选择仅创建项目,输入项目标识和名称即可。
添加成员
1. 单击项目管理 > 成员与角色管理,进入成员与角色管理界面,单击添加按钮。
2. 在添加成员界面,为新创建的成员设置所需角色,设置完成后,该成员便可以进入项目了。
资源组准备
确认资源组是否配置
在正式进行数据集成任务之前,请确认您所在的项目是否已配备集成资源。
如果提示未配置集成资源,则无法正常运行数据集成任务。
当您创建完新项目,在执行资源组这一列会显示集成资源未配置,只需要前往 WeData 首页执行资源组 > 集成资源组为新项目关联资源即可。
进入集成资源组界面,点击对应资源组后的关联项目。
集成资源组购买
如果发现目前没有集成资源组,需要前往购买。集成资源组是在运行数据集成任务时专享使用到的计算资源,本资源主要以资源组形式展现。在配置同步任务之前需要确认是否购买了数据集成资源组。详情请参见 配置集成资源组。
网络连通性确认
需要保证数据源网络(包括读端、写端)与数据集成资源组之间网络互通,且资源不可因为白名单限制等原因被拒绝访问,否则无法完成数据传输同步。
数据集成资源组内包含的机器资源默认需处于同一 VPC 网络环境下:
若数据源处于 VPC 内:
若与集成资源位于同一 VPC:可直接使用。
若与集成资源位于不同 VPC:需购买 对等连接 打通集成与数据源所在 VPC。
说明:
数据源配置
MySQL 数据源配置
进入配置数据源界面,MySQL 数据源支持云实例和连接串两种连接方式。
单击项目管理 > 数据源管理 > 新建数据源 > 选择 MySQL 数据源。
通过云实例创建数据源。
参数 | 说明 |
连接类型 | 选择云实例或连接串的数据源连接形式 |
所属项目 | 当前数据源创建时的归属项目 |
数据源名称 | 新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内 |
显示名 | 数据源在产品中使用时的显示名称,不填默认显示数据源名称 |
描述 | 选填,对本数据源的描述 |
数据源权限 | 项目共享表示当前数据源项目所有成员均可使用 ,仅个人和管理员表示数据源仅创建人和项目管理员可用 |
获取实例 | 选择账户下云数据库实例所在的地域、实例名称及 ID 信息 |
数据库名 | 需要连接的数据库名称 |
用户名 | 连接数据库的用户名称 |
密码 | 连接数据库的密码 |
数据连通性(旧) | 测试是否能够连通所配置的数据库 说明: 若连通性测试不通过,数据源仍可保存。连通性测试未通过而保存但数据源不可使用。 如果连通性测试不通过,可能是因为 WeData 被数据库所在网络防火墙禁止,需要添加腾讯云 MySQL 数据库安全组,可以参考章节二 添加腾讯云 MySQL 数据库安全组。 2024年春节后新购买的 WeData 默认无此功能,该功能已升级为资源组连通性。 |
资源组连通性(新) | 支持具体资源组与数据源的连通性测试。用户只需保证自身数据源和集成资源组所在VPC网络能通即可,无需做额外的网络打通。 说明: 2024年春节前购买 WeData 的用户无此功能,若需要使用,请联系腾讯侧运维人员处理。 |
通过连接串创建数据源。
参数 | 说明 |
数据源名称 | 新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内。 |
描述 | 选填,对本数据源的描述。 |
数据源权限 | 项目共享表示当前数据源项目所有成员均可使用 ,仅个人和管理员表示该数据源仅创建人和项目管理员可用。 |
部署方式 | 支持 CDB、自建实例、公网实例三种部署方式。 |
区域与网络 | 数据源所在地域与 VPCid。 |
JDBC URL | 用于连接 MySQL 数据库的连接串信息。URL支持IP和域名形式。 |
数据库名称 | 需要连接的数据库名称。 |
用户名 | 连接数据库的用户名称。 |
密码 | 连接数据库的密码。 |
数据连通性(旧) | 测试是否能够连通所配置的数据库。 说明: 若连通性测试不通过,数据源仍可保存。连通性测试未通过而保存但数据源不可使用。 如果连通性测试不通过,可能是因为 WeData 被数据库所在网络防火墙禁止,需要添加腾讯云 MySQL 数据库安全组,可以参考章节二 添加腾讯云 MySQL 数据库安全组。 2024年春节后新购买的 WeData 默认无此功能,该功能已升级为资源组连通性。 |
资源组连通性(新) | 支持具体资源组与数据源的连通性测试。用户只需保证自身数据源和集成资源组所在VPC网络能通即可,无需做额外的网络打通。 说明: 2024年春节前购买 WeData 的用户无此功能,若需要使用,请联系腾讯侧运维人员处理。 |
Iceberg 数据源配置
进入配置数据源界面,Iceberg 数据源仅支持连接串连接方式。
单击 项目管理 > 数据源管理 > 新建数据源 > 选择 Iceberg 数据源。
通过连接串创建数据源。
参数 | 说明 |
数据源名称 | 新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内。不可修改! |
描述 | 选填,对本数据源的描述。 |
数据源权限 | 项目共享表示当前数据源项目所有成员均可使用 ,仅个人和管理员表示该数据源仅创建人和项目管理员可用。 |
区域与网络 | 数据源所在地域与 VPCid。 |
catalog | 指定 Iceberg 使用的目录服务类型 |
Hive 版本 | 指定的 Hive 版本 |
JDBC URL | 用于连接 Iceberg 数据库的连接串信息。 |
用户名 | 连接数据库的用户名称。 |
密码 | 连接数据库的密码。 |
Hive MetaDB | 选择是否启用 Hive 元数据数据库。当前要获取 Iceberg 库、表等元数据,必须启用 Hive MetaDB。 |
Metastore URL | Hive 元数据存储的服务地址,用于访问元数据存储 |
Hive MetaDB IP 及 端口 | 启用 Hive MetaDB 后,需要配置 IP 和端口。IP 和端口是用于指定 Hive MetaDB 服务器的位置和访问端口 |
数据库名称 | 启用 Hive MetaDB 后,需要配置在 Hive MetaDB 中创建的,用于存储 Hive 元数据的数据库的名称 |
用户名 | 连接到 Hive MetaDB 的数据库账户名 |
密码 | 密码是与上述用户名关联的账户密码。在配置 Hive 时,您需要提供正确的密码以验证身份并授权对 MetaDB 的访问。 |
warehouse 路径 | Iceberg 表数据存储的位置,通常是 HDFS 上的一个目录路径 |
core-site.xml 文件 | Hadoop 的核心配置文件,包含了 Hadoop 系统的运行时设置,如 HDFS 访问控制等。该参数必填! |
hdfs-site.xml 文件 | Hadoop 的 HDFS 配置文件,用于指定 HDFS 的相关配置,如副本策略、权限设置等。该参数必填! |
hive-site.xml 文件 | Hive 的配置文件,包含了 Hive 运行时的配置信息,如连接参数、存储路径等。该参数必填! |
认证方式 | 指定连接 Hive 时使用的认证方式,可以是无需认证或者使用 Kerberos 认证 |
keytab 文件 | 当启用 Kerberos 认证时,需配置 keytab 文件,keytab 文件包含了用于 Kerberos 身份验证的密钥表。 |
conf 文件 | 当启用 Kerberos 认证时,需配置 conf 文件。conf 文件是 Hive 的配置文件,其中包含了 Hive 运行所需的各种配置参数,与 Kerberos 相关的配置信息。 |
principal | 当启用 Kerberos 认证时,还需要指定 Hive 服务的 principal,以便 Kerberos 能够验证 Hive 服务的身份。 |
数据连通性(旧) | 测试是否能够连通所配置的数据库。 说明: 若连通性测试不通过,数据源仍可保存。连通性测试未通过而保存但数据源不可使用。 2024年春节后新购买的 WeData 默认无此功能,该功能已升级为资源组连通性。 |
资源组连通性(新) | 支持具体资源组与数据源的连通性测试。用户只需保证自身数据源和集成资源组所在VPC网络能通即可,无需做额外的网络打通。 说明: 2024年春节前购买 WeData 的用户无此功能,若需要使用,请联系腾讯侧运维人员处理。 |
五、实时整库:MySQL 同步至 Iceberg
当企业需要将整个数据库迁移到新的平台或环境时,整库同步可以一次性同步所有表和数据,确保数据的完整性和一致性。
步骤一:创建整库迁移任务
登录打开 WeData 后,进入数据集成模块,然后单击配置中心 > 实时同步任务页面后,单击新建整库迁移任务。
步骤二:链路选择
在链路选择界面,选择 MySQL > Iceberg 链路,选择完成后,单击下一步。
步骤三:数据来源设置
参数 | 说明 |
数据源 | 选择需要同步的 MySQL 数据源。 |
来源表 | 所有库表:监控数据源下所有库。任务运行期间新增库、表默认将同步至目标端。 指定表:此选项下需指定到具体表名称,设置后任务仅同步指定表;若需要新增同步表需停止并重启任务。 指定库:此选项下需指定具体库名、以表名正则表达式。设置后,任务运行期间符合表名表达式的新增表默认将同步至目标端。 常用表名正则表达式: 单个库的所有表: .* 单个库的部分表: table1|table2|table3 排除某些表的所有表:^(?!(exclude1|exclude2|exclude3)$).*$ 排除某些表但包括以a开头的所有表:^(?!(exclude1|exclude2|exclude3)$)(^a.*)$ |
读取模式 | 全量 + 增量:数据同步分为全量和增量同步阶段,全量阶段完成后任务进入增量阶段。全量阶段将同步库内历史数据,增量阶段从任务启动后 binlog cdc 的位点开始同步。 增量:仅从任务启动后的 binlog cdc 位点开始同步数据。 |
过滤操作 | 支持插入、更新和删除三种操作,设置后将不同步指定操作类型的数据。 |
时区 | 设置日志时间所属时区,默认上海。 |
高级设置(可选) | 可根据业务需求配置参数。 |
MySQL 整库同步过程中,任务使用主键或指定主键(无主键时情况下,需要在高级设置里面进行指定, 当前仅支持索引列)进行切分,当前仅支持以下类型:
主键支持范围类型:
TINYINT、TINYINT_UNSIGNED、SMALLINT、SMALLINT_UNSIGNED、INT、MEDIUMINT、INT_UNSIGNED、MEDIUMINT_UNSIGNED、BIGINT、BIGINT_UNSIGNED、FLOAT、DOUBLE、DECIMAL、TIME、DATE、DATETIME、TIMESTAMP、CHAR、VARCHAR、TEXT、BINARY、VARBINARY、BLOB.
步骤四:数据目标设置
参数 | 说明 |
数据源 | 选择需要同步的目标数据源。 |
写入模式 | upsert:更新方式写入目标表。此方式下要求目标表中已设置主键: 任务将默认使用主键作为唯一键进行记录更新。 若表无主键则 append 写入。 append:追加模式写入数据表。 全量 append + 增量 upsert:根据源端数据同步阶段自动切换数据写入方式,全量阶段采用 append 写入提高性能,增量阶段采用 upsert 写入进行数据实时更新。该模式当前仅支持来源端为 Mysql、Oracle、TDSQL MySQL 的数据源。 |
库表匹配策略 | Iceberg 中数据库以及数据表对象的名称匹配规则。 |
高级设置 |
步骤五:运行设置
集成资源配置:
为当前任务关联对应的集成资源组,同时设定运行时 JM、TM 规格以及任务运行并行度。其中,当前任务实际运行时实际占用 CU数 = JobManager 规格 + TaskManager 规格 × 并行度。
资源分配方式:集成资源支持多种分配方式
固定分配:此方式下不区分任务同步阶段,全量及增量同步过程中始终为当前任务分配固定资源量。此方式可避免任务间资源抢占,适用于任务运行过程中数据可能存在较大变动的场景。
按同步阶段分配:按全量和增量不同同步阶段分配计划的资源使用量,以节约整体资源用量。
表及字段变更策略:
表及字段变更类型 | 支持策略 |
新增表 | 忽略变更、自动建表、日志告警、任务出错 |
删除表 | 忽略变更、日志告警、任务出错 |
重命名表 | 忽略变更、日志告警、任务出错 |
清空表 | 忽略变更、日志告警、任务出错 |
新增列 | 自动新增列、忽略变更、日志告警、任务出错 |
删除列 | 自动删除列、忽略变更、日志告警、任务出错 |
重命名列 | 自动重命名列、忽略变更、日志告警、任务出错 |
修改列类型 | 自动修改列类型、忽略变更、日志告警、任务出错 |
如下是上表中支持的策略和表现效果:
策略名称 | 表现效果 |
忽略变更 | 1. 目标端表/字段不做任何变更 2. 任务正常运行 |
日志告警 | 1. 目标端表/字段不做任何变更 2. 任务正常运行,默认忽略变更 |
任务停止 | 1. 目标端表/字段不做任何变更 2. 任务异常重启 |
自动处理 | 目标端根据来源端自动发生变更,自动创建出目标侧对象。例如,新增表、新增列等。 说明: 为了避免误操作造成结构变化从而影响用户业务,目前仅新增相关变更,如新增表/列,支持自动处理;其余高危操作,如删除列、清空表、删除表等操作仅支持“提醒式”响应策略(例如,告警、停止) |
写入异常策略:
Iceberg 整库同步任务提供任务级运行资源及数据失败写入处理策略。其中数据写入失败处理策略支持四种:
策略名称 | 策略说明 |
部分停止 | 部分表写入异常时,仅停止该表数据写入,其他表正常同步。已停止的表不可在本次任务运行期间恢复写入。 |
异常重启 | 部分表写入异常时,所有表均暂停写入。此策略下任务将持续重启直到所有表正常同步,重启期间可能导致部分表数据重复写入。 |
忽略异常 | 忽略表内无法写入的异常数据并标记为脏数据。该表的其他数据、以及任务内的其他表正常同步。脏数据提供 COS 归档和不归档两种方案。 COS 归档:将无法写入的脏数据进行归档,需要配置 COS 数据源、存储桶、存储目录、内容分隔符及换行符。 不归档:不需要做其他操作。 |
场景示例:
任务 Task1 下计划同步50张表,任务运行过程中表 A 内出现新增字段或字段类型变更:
部分停止:表 A 任务运行后将字段 "DEMO" 的进行了字段类型变更,并且变更后字段类型与目标端字段类型无法匹配写入。此策略下,任务将在停止源端表 A 的数据读取,后续任务仅同步其余49张表至目标端。
异常重启:表 A 任务运行后将字段 "DEMO "的进行了字段类型变更,并且变更后字段类型与目标端字段类型无法匹配写入。此策略下任务将在持续重启,期间任务内配置的所有50张表将暂停数据写入,直到表A字段纠正。
忽略异常:表 A 任务运行后将字段 "DEMO" 的进行了字段类型变更,并且变更后字段类型与目标端字段类型无法匹配写入。此策略下任务将忽略无法写入的异常数据,并标记为脏数据,表内其他数据正常同步。
步骤六:配置预览
序号 | 参数 | 说明 |
1 | 提交 | 将当前任务提交至生产环境,提交时根据当前任务是否有生产态任务可选择不同运行策略: 若当前任务无生效的线上任务,即首次提交或线上任务处于“失败”状态,可直接提交。 若当前任务存在“运行中”或“暂停”状态的线上任务需选择不同策略。停止线上作业将抛弃之前任务运行位点,从头开始消费数据,保留作业状态将在重启后从之前最后消费位点继续运行。 说明: 单击立即启动任务将在提交后立即开始运行,否则需要手动触发才会正式运行。 |
2 | 锁定/解锁 | 默认创建者为首个持锁者,仅允许持锁者编辑任务配置及运行任务。若锁定者5分钟内没有编辑操作,其他人可单 击图标抢锁,抢锁成功可进行编辑操作。 |
3 | 前往运维 | 根据当前任务名称快捷跳转至任务运维页面。 |
4 | 保存 | 预览完成后,可单击保存按钮,保存整库任务配置。仅保存的情况下,任务将不会提交至运维中心。 |
步骤七:任务配置检测与提交
当完成上面六个步骤,可以进行任务配置检测和正式提交任务,此时可以单击左上角的提交按钮,进入任务配置检查和提交阶段。
步骤 | 步骤说明 | |
任务配置检测 | 本步骤将针对任务内读端、写端以及资源进行检测: 检测通过:配置无误。 检测失败:配置存在问题,需修复以进行后续配置。 检测告警:此检测为系统建议修改项,修改完成后可单击重试重新检测;或者,您可以单击忽略异常进入下一步骤不阻塞后续配置。 | |
提交策略选择 | 本步骤中可选择本次任务提交策略: 首次提交:首次提交任务支持从默认或指定点位同步数据 立即启动,从默认点位开始同步:若源端配置为“全量 + 增量”读取方式,则默认先同步存量数据(全量阶段),完成后消费 binlog 获取变更数据(增量阶段);若源端配置为 “仅增量”读取,则默认使用 binlog 最新位点开始读取。 立即启动,从指定时间点开始同步:任务将根据配置的时间及时区同步数据。若未找到指定的时间位点,任务将默认从 binlog 最早位点开始同步;若源端读取方式为“全量 + 增量”,任务将默认跳过全量阶段从增量的指定时间位点开始同步。 暂不启动:提交后暂不启动运行任务,后续可在运维列表内手动启动任务。 非首次提交:支持带运行状态启动或继续运行任务 继续运行:此策略下新版本任务提交后,将从上次同步最后位点继续运行。 重新启动,从指定位点开始:此策略下您可指定重新启动读取的位点,任务将忽略老版本从指定位点重新开始读取。若未找到指定的时间位点任务将默认从 binlog 最早位点开始同步。 重新启动,从默认位点开始运行:此策略下将根据源端配置从默认位点开始读取。若源端配置为“全量 + 增量”读取方式,则默认先同步存量数据(全量阶段),完成后即可消费 binlog 获取变更数据(增量阶段);若源端配置为 “仅增量”读取,则默认使用 binlog 最新位点开始读取。 同时,每次提交都将新生成一个实时任务版本,您可在对话框内配置版本描述。 | 1. 首次提交 2.非首次提交 |
任务提交 | 提交成功后,您可单击前往运维查看任务运行情况。 | |
步骤八:实时任务运维
对于在完成配置并提交的实时任务,您可以进入实时运维内查看并操作对应的任务。您可以在实时运维内查看任务运行的基本运行状态、任务运行指标统计、运行日志、以及配置任务异常告警等。本文列举实时同步任务的常见运维操作。
运维列表支持对所有已提交任务进行管理和启停等操作。
实时任务状态及含义:
状态类型 | 说明 |
初始化 | 任务首次提交到运行后暂未启动运行。 |
操作中 | 状态扭转中。此状态常出现在刚完成任务操作任务后,任务正在进行状态扭转。 |
运行中 | 任务正在运行中。 |
已暂停 | 当前任务已暂停运行,任务状态及读端位点被保留。 |
已停止 | 任务已通过手动方式停止运行。 说明: 用户手动操作并停止任务,任务将扭转至此状态。其他非异常情况停止。 |
实时任务支持操作及含义:
操作类型 | 说明 |
运行 | 运行当前任务启动读写 |
停止 | 手动停止当前任务。停止后,将不保留当前任务运行状态及位点 |
暂停 | 暂停运行当前任务。暂停后,保留任务状态及位点,后续支持从已完成位点继续读取。 |
继续 | 从上次暂停位点继续运行。 说明: 建议任务勿暂停过长时间,否则再次运行时可能由于源端日志过期、或位点丢失等导致重新启动失败。 |
实时任务状态及对应允许操作:
状态类型 | 操作 | |||
| 运行 | 暂停 | 继续 | 停止 |
初始化 | ✓ | - | - | - |
操作中 | - | - | - | - |
运行中 | - | ✓ | - | ✓ |
已暂停 | - | - | ✓ | ✓ |
已停止 | ✓ | - | - | - |
六、实时单表:MySQL 同步至 Iceberg
当业务只需要关注某个特定表的数据时,单表同步可以针对性地同步特定表的数据,这样可以减少不必要的数据处理和传输,提高同步效率。
在数据集成页面左侧目录栏单击实时同步,在实时同步页面上方选择单表同步新建并进入配置页面。配置单表同步时支持两种模式:单表模式和画布模式。后续以画布模式为例讲解配置流程。
步骤一:配置 MySQL 读取节点
1. 单击左侧读取,单击选择 MySQL 节点并配置节点信息。
2. 填写参数信息,您可以参考下表进行参数信息配置。
参数 | 描述 |
节点名称 | 输入 MySQL 节点名称 |
数据源 | 选择需要同步的表所在数据源 |
库 | 选择需要同步的表所在数据库 |
表 | 支持选择多个表,请保证多表 schema 一致 |
表主键 | 分库分表模式下默认表 schema 一致。系统将使用拉去第一张表的主键,请选择或输入表主键字段名称 |
格式 | 指定 mysql 日志编码格式(utf-8、gbk、Latin1、utf8mb4) |
读取模式 | 支持全量和增量两种模式 |
过滤操作 | 设置后将不同步指定操作类型的数据,支持插入、更新和删除 |
3. 预览数据字段,单击保存。
步骤二:配置 Iceberg 写入节点
1. 单击左侧写入,单击选择 Iceberg 节点并配置节点信息。
2. 填写参数信息,您可以参考下表进行参数信息配置。
参数 | 说明 |
数据源 | 需要写入的 Iceberg 数据源。 |
库 | 支持选择、或者手动输入需写入的库名称。 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需写入的表名称。 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
写入模式 | Iceberg 实时同步写入支持两种模式: append:追加写入,主键冲突时报错。 upsert:更新写入,主键冲突时更新数据。 |
唯一键 | 选择一个字段作为写入表的主键。 |
高级设置 |
3. 预览数据字段并与读取节点配置字段映射,单击保存。
步骤三:任务配置检测与提交
当完成上面的步骤,可以进行任务配置检测和正式提交任务,此时可以查看任务配置界面左上角的相关按钮进行后续提交操作。每一个按钮的位置和功能如下:
实时同步任务在配置完成后可配置运行策略并提交到生产环境中运行。目前可在任务配置页面支持保存、提交、锁定/解锁以及前往运维操作。
序号 | 参数 | 说明 |
1 | 保存 | 保存当前任务配置信息,包括数据节点配置、节点连线、任务属性和任务调度配置。 |
2 | 提交 | 将当前任务提交至生产环境,提交时根据当前任务是否有生产态任务可选择不同运行策略。 若当前任务无生效的线上任务,即首次提交或线上任务处于“失败”状态,可直接提交。 若当前任务存在“运行中”或“暂停”状态的线上任务需选择不同策略。停止线上作业将抛弃之前任务运行位点,从头开始消费数据,保留作业状态将在重启后从之前最后消费位点继续运行。 说明: 单击立即启动任务将在提交后立即开始运行,否则需要手动触发才会正式运行。 |
3 | 锁定/解锁 | 默认创建者为首个持锁者,仅允许持锁者编辑任务配置及运行任务。若锁定者5分钟内没有编辑操作,其他人可单击图标抢锁,抢锁成功可进行编辑操作。 |
4 | 前往运维 | 根据当前任务名称快捷跳转至任务运维页面。 |
6 | 画布转换 | 将当前编辑模式转换成画布模式 |
任务提交检测
参数 | 说明 |
检测存在异常 | 支持跳过异常直接提交,或者终止提交。 |
检测仅存在警告及以下 | 可直接提交。 |
提交结果
任务提交中:
展示提交进度百分比。
提示用户勿刷新/关闭页面,文案:当前任务已提交成功,可前往运维进行任务状态及数据管理。
任务提交结果-成功:
展示任务提交成功结果。
提示成功及后续跳转:文案 “提交成功,10秒后将跳转至当前任务运维详情页面” “当前任务已提交成功,可前往运维进行任务状态及数据管理”。
展示任务提交失败原因:
失败原因返回。
步骤四:实时任务运维
对于在完成配置并提交的实时任务,您可以进入实时运维内查看并操作对应的任务。您可以在实时运维内查看任务运行的基本运行状态、任务运行指标统计、运行日志、以及配置任务异常告警等。本文列举实时同步任务的常见运维操作。
运维列表支持对所有已提交任务进行管理和启停等操作。
七、实时节点高级参数
在配置读写节点时,可以配置高级设置,配置内容和使用场景如下所示:
MySQL 类型节点级别
如下为 MySQL 读取节点在使用场景下支持的高级设置及其描述:
读/写 | 适用场景 | 配置内容: | 描述 |
读 | 单表 + 整库 | scan.newly-added-table.enabled=true | 参数描述: 设置这个参数,在暂停 > 继续后可以感知新增的表。默认是 false 1. 全增量同步时使用该参数,新增的表会读取存量数据后再读取增量数据 2. 增量同步时使用该参数,新增的表只会读取增量数据 |
读 | 单表 + 整库 | scan.incremental.snapshot.chunk.size=20000 | 参数描述: 对于数据分布均匀的任务,这个参数代表一个 chunk 内大约的条数,可以用总的数据量除以 chunk size 估算任务有多少个 chunk 数,chunk 数的多少影响了 jobmanager 是否 oom,目前 2CU 的情况下可以支持10w多 chunk,如果数据量太大,我们可以调大 chunk size 来减少 chunk 数量 注意事项: 大数据量任务(例如,总数据量1个亿以上,单条记录大于0.1M)一般建议设置20000 |
读 | 单表 + 整库 | split-key.even-distribution.factor.upper-bound=10.0d | 参数描述: mysql 存量数据读取阶段,如果数据比较离散、主键字段的最大值超大,可以修改这个参数,来使用非均匀分割,减少由于主键值超大情况下导致 chunk 数量太大从而 jm oom 的问题 注意事项: 默认值为10.0d 一般不用修改 |
读 | 单表 + 整库 | debezium.query.fetch.size=0 | 参数描述: 代表每次读取从数据库拉取的数据条数,默认是0代表jdbc 默认的 fetch size 注意事项: 1. 大任务(例如,总数据量1个亿以上,单条记录大于0.1M)只有一个读取实例时建议取1024条 2. 任务存在多个读取实例时建议降低这个值,减少内存消耗,建议取512条 |
读 | 单表 + 整库 | debezium.max.queue.size=8192 | 参数描述: 属性定义了内部队列中存储的最大事件数。如果达到此限制,Debezium 将暂停读取新事件,直到处理和提交尚未处理的事件。这个属性可以帮助避免过多事件积压在队列中,导致内存耗尽和性能下降。默认是8192 注意事项: 1. 大任务(例如,总数据量1个亿以上,单条记录大于0.1M)只有一个读取实例建议取4096 2. 任务存在多个读取实例时建议降低这个值,减少内存消耗,建议取1024 |
Iceberg 类型节点级别
如下为 Iceberg 写入节点在使用场景下支持的高级设置及其描述:
读/写 | 适用场景 | 配置内容: | 描述 |
写 | 整库 | switch.append.upsert.enable | 参数描述: 设置该参数,默认 1. 全量阶段使用 Append 模式写入,减小合并的开销 2. 增量阶段转为 Upsert 的模式写入。 默认为 true。 |
写 | 整库 | sink.partitioner | 参数描述: 指定 Source 和 Sink 端的数据交换方式。当表的数量较多时,可以使用 forward 来提高性能,当表的数量小于作业并发数时,可以设置为 rebalance 来充分利用多个并发的资源。 默认为 forward |
配置读写节点高级参数时,可以参考下图引导:
说明:
1. 一个参数一行;若需配合使用的参数写在一行内。
2. 每个参数带默认值。
八、常见问题
1. MySql serverid 冲突
错误信息:
com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master。
解决办法:目前已经优化增加随机生成 serverid,之前的任务中如果在 mysql 高级参数中显示指定了 server-id 建议删除,因为可能多个任务使用了相同的数据源,并且 server-id 设置的相同导致冲突。
2. 报 binlog 文件找不到错误信息:
错误信息:
Caused by: org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.xxx', pos=xxx, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed。
错误原因:
作业正在读取的 binlog 文件在 MySQL 服务器已经被清理时,会产生报错。导致 Binlog 清理的原因较多,可能是 Binlog 保留时间设置的过短;或者作业处理的速度追不上 Binlog 产生的速度,超过了 MySQL Binlog 文件的最大保留时间,MySQL 服务器上的 Binlog 文件被清理,导致正在读的 Binlog 位点变得无效。
解决办法:如果作业处理速度无法追上 Binlog 产生速度,可以考虑增加 Binlog 的保留时间也可以优化作业减轻反压来加速 source 消费。如果作业状态没有异常,可能是数据库发生了其他操作导致 Binlog 被清理,从而无法访问,需要结合 MySQL 数据库侧的信息来确定 Binlog 被清理的原因。
3. MySQL 报连接被重置
错误信息:
EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset。
错误原因:
1. 网络问题。
2. 作业存在反压,导致 source 无法读取数据,binlog client 空闲,如果 binlog 连接在超时后仍然空闲 mysql 服务器会断开空闲的连接。
解决方法:
1. 如果是网络问题,可以调大 mysql 网络参数 set global slave_net_timeout = 120; (默认30s) set global thread_pool_idle_timeout = 120。
2. 如果是作业反压导致,可以通过调节作业减轻反压,例如增加并行度,提升写入速度,提升 taskmanager 内存减少 gc。
4. Mysql2dlc 任务 JobManager Oom
错误信息:
1. 用户数据量比较大,可以调大 jobmanager CU 数,使用 mysql 高级参数 scan.incremental.snapshot.chunk.size 调大 chunk size 大小,默认是8096。
2. 用户数据量不大,但是主键最大值-最小值的差值却很大,导致使用均分 chunk 的策略时划分很多 chunk,修改分布因子,让用户数据走非均匀的数据切分逻辑,split-key.even-distribution.factor.upper-bound=5.0d,默认值分布因子已经修改为10.0d。
5. 用户的 binlog 数据格式不对,导致 debezium 解析异常
错误信息:
ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.000044/211839464. 2023-02-20 21:37:28.480 [blc-172.17.48.3:3306] ERROR io.debezium.pipeline.ErrorHandler [] - Producer failure io.debezium.DebeziumException: Error processing binlog event.
解决方法:
修改 binlog_row_image=full 后需要重启数据库。
6. 是否支持 gh-ost?
支持,不会迁移 Online DDL 变更产生的临时表数据,只迁移源库使用 gh-ost 执行的原始 DDL 数据,同时您可以使用默认的或者自行配置 gh-ost 影子表和无用表的正则表达式。