StarRocks 离线单表读取节点配置
参数 | 说明 |
数据来源 | 选择来源端的已配置 StarRocks 数据源 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需读取的表名称 当数据源网络不联通导致无法直接拉取库信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
切割键 | 指定用于数据分片的字段,指定后将启动并发任务进行数据同步。您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键。 |
筛选条件(选填) | 根据数据类型填写对应筛选语句,该语句会作为将要同步数据的筛选条件。 |
高级设置(选填) | 可根据业务需求配置参数。 |
StarRocks 离线单表写入节点配置
参数 | 说明 |
数据去向 | 选择目标端的已配置 StarRocks 数据源 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表 | 支持选择、或者手动输入需读取的表名称 当数据源网络不联通导致无法直接拉取库信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
表覆盖 | 开启后,SR 写入将支持表级别的原子覆盖写操作。写入数据前会先使用 CREATE TABLE LIKE 语句创建一个相同结构的新表,将新的数据导入到新表后,通过 swap 方式原子的替换旧表,以达到表覆盖目的。 |
单次提交最大行 | 任务运行时,单次 StreamLoad 导入的最大记录行数。默认50W。 |
单次提交最大字节 | 任务运行时,单次 StreamLoad 导入的最大字节数。默认100M。 |
行分割符 | 默认为 \\n。 |
前置 SQL | 执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL。 |
后置 SQL | 执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL。 |
高级设置(选填) | 可根据业务需求配置参数。 |
数据类型转换支持
读取
StarRocks 数据类型 | 内部类型 |
TINYINT、SMALLINT、INT、BIGINT | Long |
DECIMAL、DOUBLE、FLOAT | Double |
CAHR、VARCHAR、ARRAY、STRUCT | String |
DATE | Date |
BINARY | Bytes |
BOOLEAN | Boolean |
写入
内部类型 | StarRocks 数据类型 |
Long | BOOLEAN、INT |
Double | DOUBLE |
String | CAHR、VARCHAR、ARRAY、STRUCT |
Date | DATE、DATETIME |
StarRocks 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"password": "******","column": [ //列名"id","name"],"connection": [{"jdbcUrl": ["jdbc:mysql://ip:8030/source_database?query_timeout=100"],"table": [ //源表"source_table"]}],"where": "id>10", //筛选条件"splitPk": "id", //切割键"username": "root"},"name": "starrocksreader"},"transformer": [],"writer": {"parameter": {"postSql": [ //后置sql""],"swapTableName": "sink_table_timestamp", //开启表覆盖后,首先通过create table like创建相同结构的表,之后通过swap方式替换新表"column": [ //列名"id","name"],"swapTable": "true", //是否开启表覆盖"timeZone": "+08:00","database": "sink_database", //目标库"password": "******","beLoadUrl": ["ip:8030"],"labelPrefix": "","maxBatchByteSize": 104857600, //单次提交的最大字节"lineDelimiter": "\\n", //行分割符"jdbcUrl": "jdbc:mysql://ip:8030/sink_database?query_timeout=100","connectTimeout": -1,"table": "sink_table", //目标表"maxBatchRows": 500000, //单次提交的最大行"username": "root","preSql": [ //前置sql""]},"name": "starrockswriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}