MySQL 数据源

最近更新时间:2024-09-29 17:41:32

我的收藏

MySQL 环境准备与数据库配置

如果您想使用 MySQL 进行离线数据同步操作,需要先确认 MySQL 数据源版本支持情况及支持的读取数据类型转换。

支持版本

需遵循以下版本限制:
类型
版本
Driver
MySQL
5.6,5.7,8.0.x
JDBC Driver:8.0.21

确认 MySQL 版本

数据集成对 MySQL 版本有要求,查看当前待同步的 MySQL 是否符合版本要求。您可以在 MySQL 数据库通过如下语句查看当前 MySQL 数据库版本。
select version();

设置 MySQL 服务器权限

您可以定义一个具有适当权限的 MySQL 用户。
1. 创建 MySQL 用户(可选):
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
2. 向用户授予所需的权限:
在离线同步数据的情况下,该账号必须拥有数据库的 SELECT 权限。执行命令可以参考以下示例:
mysql> GRANT SELECT ON *.* TO 'user'@'localhost';
3. 刷新用户的权限:
mysql>FLUSH PRIVILEGES;

数据源配置

进入配置数据源界面,MySQL 数据源支持云实例和连接串两种连接方式。
单击项目管理 > 数据源管理 > 新建数据源 > 选择 MySQL 数据源
通过连接串创建数据源。



参数
说明
数据源名称
新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内。
描述
选填,对本数据源的描述。
数据源权限
项目共享表示当前数据源项目所有成员均可使用 ,仅个人和管理员表示该数据源仅创建人和项目管理员可用。
部署方式
支持自建实例、公网实例两种部署方式,其中自建实例为在腾讯云服务器上部署的数据源实例,公网实例为在客户本地 IDC 或其他云上资源实例,支持通过公网进行访问连接。
区域与网络
当选择自建实例时,需要选择数据源实例所在地域与 vpcID。
JDBC URL
用于连接 MySQL 数据源实例的连接串信息,包含 host ip、port、数据库名称等信息。
用户名
连接数据库的用户名称。
密码
连接数据库的密码。
数据连通性
测试是否能够连通所配置的数据库。
说明:
若连通性测试不通过,数据源仍可保存。连通性测试未通过而保存但数据源不可使用。
如果连通性测试不通过,可能是因为 WeData 被数据库所在网络防火墙禁止,需要添加 腾讯云 MySQL 数据库安全组

MySQL 离线单表读取节点配置




参数
说明
数据来源
可用的 MySQL 数据源。
支持选择、或者手动输入需读取的库名称。
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需读取的表名称。
分表情况下,可在 mysql 源端支持选择或输入多个表名称,多个表需保证结构一致。
分表情况下,支持配置表序号区间。例如'table_[0-99]'表示读取'table_0'、'table_1'、'table_2'直到'table_99' ; 如果您的表数字后缀的长度一致,例如'table_000'、'table_001'、'table_002'直到'table_999',您可以配置为'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]' 。
当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。
添加分库分表
适用于分库场景,单击后可配置多个数据源、库及表信息。分库分表场景下需保证所有表结构一致,任务配置将默认展示并使用第一个表结构进行数据获取。
切割键
指定用于数据分片的字段,指定后将启动并发任务进行数据同步,提升数据同步效率。您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。
筛选条件(选填)
在实际业务场景中,通常会选择当天的数据进行同步,将 where 条件指定为 gmt_create>$bizdate。where 条件可以有效地进行业务增量同步。
如果不填写 where 语句,包括不提供 where 的 key 或 value,数据同步均视作同步全量数据。
不可以将 where 条件指定为 limit 10,这不符合 MySQL WHERE 子句约束。
高级设置(选填)
可根据业务需求配置参数。

MySQL 离线单表写入节点配置




参数
说明
数据去向
需要写入的 MySQL 数据源。
写入类型
单表写入:支持写入单表
分库分表写入:支持一次性写入同一个数据源内分库分表的 MySQL 表中,分库分表写入请保证所有表 schema 结构一致、且所有目标表存在。
支持选择、或者手动输入需写入的库名称
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需写入的表名称
当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。
在数据集成网络连通的情况下,仍可进行数据同步。
是否清空表
在写入该 MySQL 数据表前可以手动选择是否清空该数据表。
写入模式
MySQL 写入支持三种模式:
Append: 当主键/唯一性索引冲突时,冲突行无法写入 。
Overwrite:主键/唯一性索引冲突时,会先删除原有行,再插入新行 。
On duplicate key:主键/唯一性索引冲突时,新行会更新已指定的字段。指定的字段是指在同步任务的字段映射配置处所添加的字段。
批量提交大小
一次性批量提交的记录数大小,该值可以极大减少数据同步系统与 MySQL 的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程 OOM 异常。
前置 SQL(选填)
执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,执行前清空表中的旧数据(truncate table tablename)。
后置 SQL(选填)
执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,加上某一个时间戳 alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。

数据类型转换支持

读取

MySQL 读取支持的字段类型及类型转换对应关系如下(在处理 MySQL 时,会将 MySQL 数据源的数据类型和数据处理引擎的数据类型做映射):
MySQL 数据类型
内部类型
tinyint, smallint, mediumint, int, bigint,year
Long
float, double, decimal
Double
varchar, char, tinytext, text, mediumtext, longtext,set,json
String
date, datetime, timestamp, time
Date
bit, bool
Boolean
tinyblob, mediumblob, blob, longblob, varbinary
Bytes

写入

MySQL 写入支持的字段类型及类型转换对应关系如下:
内部类型
MySQL 数据类型
Long
tinyint, smallint, mediumint, int, bigint,year
Double
float, double, decimal
String
varchar, char, tinytext, text, mediumtext, longtext,set,json
Date
date, datetime, timestamp, time
Boolean
bit, bool
Bytes
tinyblob, mediumblob, blob, longblob, varbinary

常见问题

1. 使用 COLLATE 指定排序规则可能会造成部分数据同步重复

问题原因
COLLATE 用于指定字段排序规则,COLLATE 会影响 ORDER BY 语句的顺序,影响到 WHERE 条件中大于小于号筛选结果,以及 DISTINCT、GROUP BY、HAVING 等查询结果。涉及到字符类型比较或排序都与 COLLATE 有关。
解决方案:
集成任务设置分割键时选择非 COLLATE 的列。

MySQL 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数

配置单库单表

"job": {
"content": [
{
"reader": {
"parameter": {
"password": "******",
"column": [ //列名
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true"
],
"table": [ //源表
"source_table"
]
}
],
"where": "id>10", //筛选条件
"splitPk": "id", //切割键
"username": "root"
},
"name": "mysqlreader"
},
"transformer": [],
"writer": {
"parameter": {
"postSql": [ //后置sql
""
],
"password": "******",
"session": [
"set session sql_mode='ANSI'"
],
"column": [ //列名
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true",
"table": [ //目标表
"sink_table"
]
}
],
"writeMode": "replace", //写入模式
"batchSize": 1024, //批量提交大小
"username": "root",
"on": [ // 唯一性索引,写入过程中发生冲突时不同的写入模式有不同的写入策略
"id"
],
"preSql": [ //前置sql
"truncate table sink_table"
]
},
"name": "mysqlwriter"
}
}
],
"setting": {
"errorLimit": {
"record": 0//脏数据阈值
"percentage": 0.02 //脏数据百分比 1表示100%
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}

配置分库分表

"job": {
"content": [
{
"reader": {
"parameter": {
"password": "******", //可以为空,作为必填参数之一
"column": [ //列名,需要保证所有表的结构相同
"id",
"name"
],
"connection": [
{
"password": "******", //第一张表所在数据源的password
"jdbcUrl": [
"jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true"
],
"table": [
"source_table_1"
],
"username": "root"
},
{
"password": "******", //第二张表所在数据源的password
"jdbcUrl": [
"jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true"
],
"table": [
"source_table_2"
],
"username": "root"
}
],
"splitPk": "id", //切割键
"username": "root" //必填参数之一
},
"name": "mysqlreader"
},
"transformer": [],
"writer": {
"parameter": {
"postSql": [ //后置sql
""
],
"password": "******",
"session": [
"set session sql_mode='ANSI'"
],
"column": [ //列名
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://ip:3306/database?rewriteBatchedStatements=true&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoReconnect=true",
"table": [ //目标表
"sink_table"
]
}
],
"writeMode": "replace", //写入模式
"batchSize": 1024, //批量提交大小
"username": "root",
"on": [ //唯一性索引,写入过程中发生冲突时不同的写入模式有不同的写入策略
"id"
],
"preSql": [ //前置sql
""
]
},
"name": "mysqlwriter"
}
}
],
"setting": {
"errorLimit": {
"record": 0
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}