Doris/TCHouse-D 数据源

最近更新时间:2024-09-29 17:41:33

我的收藏
TCHouse-D 数据源与 Doris 配置方式相同,此处以 Doris 数据源为例进行以下讲解:

支持版本

支持 Doris 0.x、1.1.x、1.2.x、2.x 版本。

使用限制

1. Doris 写入是通过 Stream load HTTP 接口,需要保证数据源中 FE 或 BE 的 IP 或端口填写正确。
2. 由于 Stream load 的原理是由 BE 发起的导入并分发数据,建议的导入数据量在1G到10G之间。由于默认的最大 Stream load 导入数据量为10G。
要导入超过10G的文件需要修改 BE 的配置 streaming_load_max_mb。
例如:待导入文件大小为15G,修改 BE 配置 streaming_load_max_mb 为16000即可。
3. Stream load 的默认超时为600秒,按照 Doris 目前最大的导入限速来看,约超过3G的文件就需要修改导入任务默认超时时间了。
导入任务超时时间 = 导入数据量 / 10M/s(具体的平均导入速度需要用户根据自己的集群情况计算)。
例如:导入一个10G的文件,timeout = 1000s等于10G / 10M/s

Doris 离线单表读取节点配置




参数
说明
数据来源
需要同步的可用 Doris 数据源。
支持选择、或者手动输入需读取的库名称。
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需读取的表名称。
切割键
指定用于数据分片的字段,指定后将启动并发任务进行数据同步。您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键。
筛选条件(选填)
在实际业务场景中,通常会选择当天的数据进行同步,将 where 条件指定为 gmt_create>$bizdate。where 条件可以有效地进行业务增量同步。如果不填写 where 语句,包括不提供 where 的 key 或 value,数据同步均视作同步全量数据。
高级设置(选填)
可根据业务需求配置参数。

Doris 离线单表写入节点配置




参数
说明
数据去向
需要写入的 Doris 数据源。
支持选择、或者手动输入需写入的库名称。
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需写入的表名称。
当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。
表覆盖
开启后,doris 将支持表级别的原子覆盖写操作。写入数据前会先使用 CREATE TABLE LIKE 语句创建一个相同结构的新表,将新的数据导入到新表后,通过 swap 方式原子的替换旧表,以达到表覆盖目的。
单次提交最大行
一次性批量提交的记录数大小。
单次提交最大字节
一次性批量提交的最大数据量。
行分割符(选填)
Doris 写入的键分隔符,默认'\\n',支持手动输入需要您保证与创建的 Doris 表的字段分隔符一致,否则无法在 Doris 表中查到数据。
前置 SQL
执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,执行前清空表中的旧数据(truncate table tablename)。
后置 SQL
执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,加上某一个时间戳 alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。
高级参数
可根据业务需求配置参数。

数据类型转换支持

读取

Doris 数据类型
内部类型
TINYINT、SMALLINT、INT、BIGINT
Long
FLOAT、DOUBULE、DECIMAL
Double
VARCHAR、CHAR、ARRAY、STRUCT、STRING
String
DATE、DATETIME
Date
BOOLEAN
Boolean

写入

内部类型
Doris 数据类型
Long
TINYINT、SMALLINT、INT、BIGINT
Double
DOUBLE、FLOAT、DECIMAL
String
STRING、VARCHAR、CHAR、ARRAY、STRUCT
Date
DATETIME、DATE
Boolean
BOOLEAN

常见问题

1. 分区不存在报错:"no partition for this tuple"

问题原因:Doris 缺少相应的分区。
解决方案:如果以时间分区,建议开启动态分区。
表 tbl1 分区列 k1 类型为 DATE,创建一个动态分区规则。按天分区,只保留最近7天的分区,并且预先创建未来3天的分区。
CREATE TABLE tbl1
(
k1 DATE,
...
)
PARTITION BY RANGE(k1) ()
DISTRIBUTED BY HASH(k1)
PROPERTIES
(
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
假设当前日期为 2020-05-29。则根据以上规则,tbl1 会产生以下分区:
p20200529: ["2020-05-29", "2020-05-30")
p20200530: ["2020-05-30", "2020-05-31")
p20200531: ["2020-05-31", "2020-06-01")
p20200601: ["2020-06-01", "2020-06-02")
在第二天,即 2020-05-30,会创建新的分区 p20200602: ["2020-06-02", "2020-06-03")
2020-06-06 时,因为 dynamic_partition.start 设置为 7,则将删除7天前的分区,即删除分区 p20200529。
参考:详情请参见 临时分区

2. 同步数据量过大导致:"The size of this batch exceed the max size of json type data"

问题原因:批次提交的数据量太大,建议忽略 JSON 数据量的检测。




3. 导入频次太快导致:"tablet writer write failed, err=-235"

问题原因:导入频次太快导致 VersionCount 超过设置的大小(默认500,由 BE 参数 max_tablet_version_num 控制),建议调整 VersionCount。
解决方案:
1. 找到报错的 tablet_id。
show tablet 28750963;
2. 执行 DetailCmd 显示命令。
SHOW PROC '/dbs/40637/16934967/partitions/28750944/16934968/28750963';



3. 结果中的 versionCount 即表示版本数量。如果发现某个副本的版本数量过多,则需要降低导入频率或停止导入。如果停止导入后,版本数依然没有下降,则需要去对应的 BE 节点查看 be.INFO 日志,搜索 tablet id 以及 compaction 关键词,检查 compaction 是否正常运行。
4. 可以通过增大max_tablet_version_num或者优化tablet 的 compaction,提高压缩效率来减少版本数量。

4. Header 临时分区长度超过限制:"Bad Message 431"

问题原因:temporary_partition 太大,减少 temporary_partition 数量。






参考:详情请参见 动态分区

5. DLC 同步 Doris 时,Doris 端报错:字段超过 schema 长度

问题信息:
uuid 长度设置为200,同步记录中 uuid 长度为232。
Reason: column_name[uuid], the length of input is too long than schema. first 32 bytes of input str: [0000000000000BB4E595527BE******] schema length: 200; actual length: 232; . src line [];
问题原因:
DLC 的 Spark 引擎在计算字符长度时,中文字符占一个字符长度,而 Doris 计算长度时,将中文当成 UTF-8 编译,占3个字符长度。两端计算长度不统一可能导致临界长度的字符串触发预警。
解决方案:
在 DLC 源端修改脏数据
增大 Doris 表对 uuid 长度的限制

Doris/TCHouse-D 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数
"job": {
"content": [
{
"reader": {
"parameter": {
"password": "******",
"column": [ //列名
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://ip:8030/source_database"
],
"table": [ //源表
"source_table"
]
}
],
"where": "id>10", //筛选条件
"splitPk": "id", //切割键
"username": "root"
},
"name": "dorisreader"
},
"transformer": [],
"writer": {
"parameter": {
"postSql": [ //后置sql
""
],
"swapTableName": "sink_table_timestamp", //开启表覆盖后,首先通过create table like创建相同结构的表,之后通过swap方式替换新表
"column": [ //列名
"id",
"name"
],
"swapTable": "true", //是否开启表覆盖
"timeZone": "+08:00",
"database": "sink_database", //目标库
"password": "******",
"beLoadUrl": [
"ip:9030",
"ip:9030",
"ip:9030"
],
"labelPrefix": "",
"maxBatchByteSize": 104857600, //单次提交的最大字节
"lineDelimiter": "\\n", //行分割符
"jdbcUrl": "jdbc:mysql://ip:9030/sink_database",
"connectTimeout": -1,
"table": "sink_table", //目标表
"maxBatchRows": 500000, //单次提交的最大行
"username": "root",
"preSql": [ //前置sql
""
]
},
"name": "doriswriter"
}
}
],
"setting": {
"errorLimit": { //脏数据阈值
"record": 0
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}