使用限制
1. Kudu reader 必须配置 upperBound 和 lowerBound,并发设置才有效。
2. upperBound 和 lowerBound 在 kudu reader 中为 long 型,因此只有时间类型或整型字段才支持被设置为 Bound。
3. reader 利用 kudu-client 直连 Kudu server 读取数据,where 条件不支持 Impala SQL 语法。
4. 增量同步时,where 条件语法:
create_time>= '${yyyy-MM-dd-1d HH:mm:ss}' and create_time < '${yyyy-MM-dd HH:mm:ss}'
Bound 目前支持整型和日期函数配置。日期函数配置使用方法:
// 转化为13位时间戳(毫秒)TimestampMillis('yyyy-MM-ddTHH:mm:00+0800')TimestampMillis('2023-07-10T00:00:00+0800')TimestampMillis('2023-07-10 00:00:00')TimestampMillis('2023-07-10')// 转化为10位时间戳(秒)TimestampSeconds('yyyy-MM-ddTHH:mm:00+0800')TimestampSeconds('2023-07-10T00:00:00+0800')TimestampSeconds('2023-07-10 00:00:00')TimestampSeconds('2023-07-10')
Kudu 离线单表读取节点配置
参数 | 说明 |
数据来源 | 可用的 Kudu 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称。 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需读取的表名称。 |
切割键 | 指定用于数据分片的字段,指定后将启动并发任务进行数据同步。您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键。 |
筛选条件(选填) | 在实际业务场景中,通常会选择当天的数据进行同步,将 where 条件指定为 gmt_create>$bizdate。where 条件可以有效地进行业务增量同步。如果不填写 where 语句,包括不提供 where 的 key 或 value,数据同步均视作同步全量数据。 |
upperBound | 分区上限。 若 sql 建表语句中 partition “5”<= values <=“10”,则 lowerbound为“5” ,upperbound 为“10”; 若 sql 建表语句中 partition value =“x”,则 lowerbound 为“x”,upperbound 为“x\\000”; |
lowerBound | 分区下限。 若 sql 建表语句中 partition “5”<= values <=“10”,则 lowerbound为“5” ,upperbound 为“10”; 若 sql 建表语句中 partition value =“x”,则 lowerbound为“x”,upperbound 为“x\\000”; |
Kudu 离线单表写入节点配置
参数 | 说明 |
数据去向 | 需要写入的 Kudu 数据源。 |
库 | 支持选择、或者手动输入需写入的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需写入的表名称 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
是否清空表 | 在写入该 Kudu 数据表前可以手动选择是否清空该数据表。 |
写入模式 | Kudu 写入支持三种模式: Append: 当主键/唯一性索引冲突时,冲突行无法写入 Overwrite:主键/唯一性索引冲突时,会先删除原有行,再插入新行 On duplicate key:主键/唯一性索引冲突时,新行会替换已指定的字段的语句 |
批量提交大小 | 一次性批量提交的记录数大小,该值可以极大减少数据同步系统与 Kudu 的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程 OOM 异常。 |
数据类型转换支持
读取
Kudu 数据类型 | 内部类型 |
int8、int16、int32、int64 | Long |
float、double、decimal | Double |
string、varchar | String |
unixtime_micors、date | Date |
binary | Bytes |
bool | Boolean |
写入
内部类型 | Kudu 数据类型 |
Long | int8、int16、int32、int64 |
Double | float、double、decimal |
String | string、date |
Date | unixtime_micors、varchar |
Bytes | binary |
Boolean | bool |
Kudu 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"masterAddress": "ip:7051,ip:7051,ip:7051", //支持多ip"upperBound": "10", //分区上限"column": [ //列名"id","name"],"where": [ //筛选条件"id>10"],"lowerBound": "5", //分区下限"table": "database.source_table" //源表},"name": "kudureader"},"transformer": [],"writer": {"parameter": {"kuduConfig": {"kudu.master_addresses": "ip:7051,ip:7051,ip:7051" //支持多ip},"column": [ //列名{"name": "id","type": "int"},{"name": "name","type": "string"}],"writeMode": "append", //写入模式"batchSize": 1024, //批量提交大小"encoding": "UTF-8", //编码方式"table": "database.sink_table" //目标表},"name": "kuduwriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}