MySQL 环境准备与数据库配置
如果您想使用 MySQL 进行离线数据同步操作,需要先确认 MySQL 数据源版本支持情况及支持的读取数据类型转换。
支持版本
需遵循以下版本限制:
类型 | 版本 | Driver |
MySQL | 5.6,5.7,8.0.x | JDBC Driver:8.0.21 |
确认 MySQL 版本
数据集成对 MySQL 版本有要求,查看当前待同步的 MySQL 是否符合版本要求。您可以在 MySQL 数据库通过如下语句查看当前 MySQL 数据库版本。
select version();
设置 MySQL 服务器权限
您可以定义一个具有适当权限的 MySQL 用户。
1. 创建 MySQL 用户(可选):
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
2. 向用户授予所需的权限:
在离线同步数据的情况下,该账号必须拥有数据库的 SELECT 权限。执行命令可以参考以下示例:
mysql> GRANT SELECT ON *.* TO 'user'@'localhost';
3. 刷新用户的权限:
mysql>FLUSH PRIVILEGES;
数据源配置
进入配置数据源界面,MySQL 数据源支持云实例和连接串两种连接方式。
单击项目管理 > 数据源管理 > 新建数据源 > 选择 MySQL 数据源。
通过连接串创建数据源。
参数 | 说明 |
数据源名称 | 新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内。 |
描述 | 选填,对本数据源的描述。 |
数据源权限 | 项目共享表示当前数据源项目所有成员均可使用 ,仅个人和管理员表示该数据源仅创建人和项目管理员可用。 |
部署方式 | 支持自建实例、公网实例两种部署方式,其中自建实例为在腾讯云服务器上部署的数据源实例,公网实例为在客户本地 IDC 或其他云上资源实例,支持通过公网进行访问连接。 |
区域与网络 | 当选择自建实例时,需要选择数据源实例所在地域与 vpcID。 |
JDBC URL | 用于连接 MySQL 数据源实例的连接串信息,包含 host ip、port、数据库名称等信息。 |
用户名 | 连接数据库的用户名称。 |
密码 | 连接数据库的密码。 |
数据连通性 | 测试是否能够连通所配置的数据库。 说明: 若连通性测试不通过,数据源仍可保存。连通性测试未通过而保存但数据源不可使用。 如果连通性测试不通过,可能是因为 WeData 被数据库所在网络防火墙禁止,需要添加 腾讯云 MySQL 数据库安全组。 |
MySQL 离线单表读取节点配置
参数 | 说明 |
数据来源 | 可用的 MySQL 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称。 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需读取的表名称。 分表情况下,可在 mysql 源端支持选择或输入多个表名称,多个表需保证结构一致。 分表情况下,支持配置表序号区间。例如'table_[0-99]'表示读取'table_0'、'table_1'、'table_2'直到'table_99' ; 如果您的表数字后缀的长度一致,例如'table_000'、'table_001'、'table_002'直到'table_999',您可以配置为'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]' 。 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
添加分库分表 | 适用于分库场景,单击后可配置多个数据源、库及表信息。分库分表场景下需保证所有表结构一致,任务配置将默认展示并使用第一个表结构进行数据获取。 |
切割键 | 指定用于数据分片的字段,指定后将启动并发任务进行数据同步,提升数据同步效率。您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。 |
筛选条件(选填) | 在实际业务场景中,通常会选择当天的数据进行同步,将 where 条件指定为 gmt_create>$bizdate。where 条件可以有效地进行业务增量同步。 如果不填写 where 语句,包括不提供 where 的 key 或 value,数据同步均视作同步全量数据。 不可以将 where 条件指定为 limit 10,这不符合 MySQL WHERE 子句约束。 |
高级设置(选填) | 可根据业务需求配置参数。 |
MySQL 离线单表写入节点配置
参数 | 说明 |
数据去向 | 需要写入的 MySQL 数据源。 |
写入类型 | 单表写入:支持写入单表 分库分表写入:支持一次性写入同一个数据源内分库分表的 MySQL 表中,分库分表写入请保证所有表 schema 结构一致、且所有目标表存在。 |
库 | 支持选择、或者手动输入需写入的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需写入的表名称 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。 在数据集成网络连通的情况下,仍可进行数据同步。 |
是否清空表 | 在写入该 MySQL 数据表前可以手动选择是否清空该数据表。 |
写入模式 | MySQL 写入支持三种模式: Append: 当主键/唯一性索引冲突时,冲突行无法写入 。 Overwrite:主键/唯一性索引冲突时,会先删除原有行,再插入新行 。 On duplicate key:主键/唯一性索引冲突时,新行会更新已指定的字段。指定的字段是指在同步任务的字段映射配置处所添加的字段。 |
批量提交大小 | 一次性批量提交的记录数大小,该值可以极大减少数据同步系统与 MySQL 的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程 OOM 异常。 |
前置 SQL(选填) | 执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,执行前清空表中的旧数据(truncate table tablename)。 |
后置 SQL(选填) | 执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,加上某一个时间戳 alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。 |
数据类型转换支持
读取
MySQL 读取支持的字段类型及类型转换对应关系如下(在处理 MySQL 时,会将 MySQL 数据源的数据类型和数据处理引擎的数据类型做映射):
MySQL 数据类型 | 内部类型 |
tinyint, smallint, mediumint, int, bigint,year | Long |
float, double, decimal | Double |
varchar, char, tinytext, text, mediumtext, longtext,set,json | String |
date, datetime, timestamp, time | Date |
bit, bool | Boolean |
tinyblob, mediumblob, blob, longblob, varbinary | Bytes |
写入
MySQL 写入支持的字段类型及类型转换对应关系如下:
内部类型 | MySQL 数据类型 |
Long | tinyint, smallint, mediumint, int, bigint,year |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext,set,json |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
常见问题
1. 使用 COLLATE 指定排序规则可能会造成部分数据同步重复
问题原因:
COLLATE 用于指定字段排序规则,COLLATE 会影响 ORDER BY 语句的顺序,影响到 WHERE 条件中大于小于号筛选结果,以及 DISTINCT、GROUP BY、HAVING 等查询结果。涉及到字符类型比较或排序都与 COLLATE 有关。
解决方案:
集成任务设置分割键时选择非 COLLATE 的列。
MySQL 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
配置单库单表
"job": {"content": [{"reader": {"parameter": {"password": "******","column": [ //列名"id","name"],"connection": [{"jdbcUrl": ["jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true"],"table": [ //源表"source_table"]}],"where": "id>10", //筛选条件"splitPk": "id", //切割键"username": "root"},"name": "mysqlreader"},"transformer": [],"writer": {"parameter": {"postSql": [ //后置sql""],"password": "******","session": ["set session sql_mode='ANSI'"],"column": [ //列名"id","name"],"connection": [{"jdbcUrl": "jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true","table": [ //目标表"sink_table"]}],"writeMode": "replace", //写入模式"batchSize": 1024, //批量提交大小"username": "root","on": [ // 唯一性索引,写入过程中发生冲突时不同的写入模式有不同的写入策略"id"],"preSql": [ //前置sql"truncate table sink_table"]},"name": "mysqlwriter"}}],"setting": {"errorLimit": {"record": 0,//脏数据阈值"percentage": 0.02 //脏数据百分比 1表示100%},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}
配置分库分表
"job": {"content": [{"reader": {"parameter": {"password": "******", //可以为空,作为必填参数之一"column": [ //列名,需要保证所有表的结构相同"id","name"],"connection": [{"password": "******", //第一张表所在数据源的password"jdbcUrl": ["jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true"],"table": ["source_table_1"],"username": "root"},{"password": "******", //第二张表所在数据源的password"jdbcUrl": ["jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true"],"table": ["source_table_2"],"username": "root"}],"splitPk": "id", //切割键"username": "root" //必填参数之一},"name": "mysqlreader"},"transformer": [],"writer": {"parameter": {"postSql": [ //后置sql""],"password": "******","session": ["set session sql_mode='ANSI'"],"column": [ //列名"id","name"],"connection": [{"jdbcUrl": "jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true","table": [ //目标表"sink_table"]}],"writeMode": "replace", //写入模式"batchSize": 1024, //批量提交大小"username": "root","on": [ //唯一性索引,写入过程中发生冲突时不同的写入模式有不同的写入策略"id"],"preSql": [ //前置sql""]},"name": "mysqlwriter"}}],"setting": {"errorLimit": {"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}