OceanBase 数据源

最近更新时间:2024-09-29 17:41:33

我的收藏
说明:
注意 OceanBase 数据源需开白使用。

OceanBase 离线单表读取节点配置




参数
说明
数据源模式
支持 MySQL 兼容模式、Oracle 兼容模式
数据来源
可用的 OceanBase 数据源。
支持选择、或者手动输入需读取的库名称
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需读取的表名称。
添加分库分表
适用于分库场景,单击后可配置多个数据源、库及表信息。分库分表场景下需保证所有表结构一致,任务配置将默认展示并使用第一个表结构进行数据获取。
切割键
您可以将源数据表中某一列作为切割键,建议使用主键或有索引的列作为切割键,仅支持类型为整型的字段。
读取数据时,根据配置的字段进行数据分片,实现并发读取,可以提升数据同步效率。
筛选条件(选填)
根据数据类型填写对应筛选语句,该语句会作为将要同步数据的筛选条件,暂时不支持 limit 关键字过滤,SQL 语法与选择的数据源一致。

OceanBase 离线单表写入节点配置




参数
说明
数据源模式
支持 MySQL 兼容模式、Oracle 兼容模式
数据去向
可用的 OceanBase 数据源。
写入类型
(MySQL 兼容模式)
单表写入:支持写入单表
分库分表写入:支持一次性写入同一个数据源内分库分表的 MySQL 表中,分库分表写入请保证所有表 schema 结构一致、且所有目标表存在。
支持选择、或者手动输入需读取的库名称
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需读取的表名称。
是否清空表
在写入该 MySQL 数据表前可以手动选择是否清空该数据表。
写入模式
MySQL 兼容模式下写入支持三种模式:
Append: 当主键/唯一性索引冲突时,冲突行无法写入 。
Overwrite:主键/唯一性索引冲突时,会先删除原有行,再插入新行 。
On duplicate key:主键/唯一性索引冲突时,新行会替换已指定的字段的语句。
Oracle 兼容模式下写入支持两种模式:
Append:追加写入。
Upsert:根据设置主键字段进行数据更新写入。
批量提交大小
一次性批量提交的记录数大小,该值可以极大减少数据同步系统与 OceanBase 的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程 OOM 异常。
前置 SQL(选填)
执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,执行前清空表中的旧数据(truncate table tablename)。
后置 SQL(选填)
执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,加上某一个时间戳 alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。

OceanBase 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数
"job": {
"content": [
{
"reader": {
"parameter": {
"password": "******",
"column": [ //列名
"id",
"user_id"
],
"connection": [
{
"jdbcUrl": [
"jdbc:oceanbase://ip:9527/source_database"
],
"table": [ //源表
"source_database.source_table"
]
}
],
"where": "id>10", //筛选条件
"splitPk": "id", //切割键
"username": "root"
},
"name": "oceanbasev10reader"
},
"transformer": [],
"writer": {
"parameter": {
"postSql": [ //后置sql
""
],
"password": "******",
"session": [
"set session sql_mode='ANSI'"
],
"column": [ //列名
"id",
"user_id"
],
"connection": [
{
"jdbcUrl": "jdbc:oceanbase://ip:9527/sink_database",
"table": [ //目标表
"sink_database.\\"sink_table\\""
]
}
],
"writeMode": "upsert", //写入模式
"batchSize": 1024, //批量提交大小
"username": "root",
"on": [ //唯一性索引
"id"
],
"preSql": [ //前置sql
""
]
},
"name": "oceanbasev10writer"
}
}
],
"setting": {
"errorLimit": { //脏数据阈值
"record": 0
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}