TBase 离线单表读取节点配置
参数 | 说明 |
数据来源 | 可用的 TBase 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
Schema | 支持选择、或者手动输入需读取的 Schema 名称。 |
表 | 支持选择、或者手动输入需读取的表名称。支持同时读取多张表。当配置为多张表时,您需要保证多张表的 schema 结构一致。 |
切割键 | Tbase 进行数据抽取时,如果指定 splitPk,表示您希望使用 splitPk 代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效能。 推荐 splitPk 用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。 目前 splitPk 仅支持整型数据切分,不支持字符串、浮点和日期等其它类型 。如果您指定其它非支持类型,则忽略 splitPk 功能,使用单通道进行同步。 如果设置 splitPk 值为空,底层将视作您不允许对单表进行切分,因此使用单通道进行抽取。 |
筛选条件(选填) | 根据数据类型填写对应筛选语句,该语句会作为将要同步数据的筛选条件。
Tbase 根据指定的 where 条件拼接 SQL,并根据该 SQL 进行数据抽取。例如在测试时,可以将 where 条件指定为 limit 10。在实际业务场景中,通常会选择当天的数据进行同步,将 where 条件指定为 gmt_create > $bizdate。 where 条件可以有效地进行业务增量同步。 where 条件为空,视作同步全表所有的信息。 |
TBase 离线单表写入节点配置
参数 | 说明 |
数据源 | 需要写入的 TBase 数据源。 |
库 | 支持选择、或者手动输入需写入的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
Schema | 支持选择、或者手动输入需读取的 Schema 名称。 |
表 | 支持选择、或者手动输入需写入的表名称 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
是否清空表 | 在写入该 TBase 数据表前可以手动选择是否清空该数据表。 |
写入模式 | TBase 支持两种写入模式: append:追加写入,主键冲突时报错。 upsert:更新写入,主键冲突时更新数据。 |
批量提交大小 | 一次性批量提交的记录数大小,该值可以极大减少数据同步系统与 TBase 的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程 OOM 异常。 |
前置 SQL(选填) | 执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,执行前清空表中的旧数据(truncate table tablename)。 |
后置 SQL(选填) | 执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,加上某一个时间戳 alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。 |
数据类型转换支持
TBase 写入支持的数据类型及转换对应关系如下(在处理 TBase 时,会先将 TBase 数据源的数据类型和数据处理引擎的数据类型做映射):
写入
内部类型 | TBase 数据类型 |
Long | INTEGER、SMALLINT、BIGINT |
Double | DECIMAL、FLOAT、DOUBLE、REAL、NUMERIC |
String | CHAR、VARCHAR、ARRAY |
Date | DATE、TIMESTAMP、TIME |
Boolean | BOOL |
TBase 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"password": "******","column": [//列名"id","name"],"connection": [{"jdbcUrl": ["jdbc:postgresql://ip:9000/database?reWriteBatchedInserts=true"],"table": [ //源表"schema.source_table"]}],"where": "id>10", //筛选条件"splitPk": "id", //切割键"username": "root"},"name": "postgresqlreader"},"transformer": [],"writer": {"parameter": {"postSql": [ //后置sql""],"password": "******","pkColumn": "id", //写入模式选择upsert时,需要指定主键"column": [ //列名"id","name"],"connection": [{"jdbcUrl": "jdbc:postgresql://ip:9000/database?reWriteBatchedInserts=true","table": ["schema.sink_table"]}],"writeMode": "upsert", //写入模式"batchSize": 1024, //批量提交大小"username": "root","preSql": [ //前置sql""]},"name": "tbasewriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}