数据库 Doris

最近更新时间:2025-02-25 16:10:22

我的收藏

介绍

Flink Connector Doris 目前支持通过 Flink 将数据写入 Doris,基于 开源版本 实现。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
支持,基于社区 release-1.1.1
1.16
支持,基于社区 release-24.0.1
1.18
支持,基于社区 release-24.0.1
注意:
Flink 1.14+ (包含 1.14)默认开启两阶段(2PC)提交。

使用范围

Flink Connector Doris 目前仅支持 Doris sink。支持的 Doris 版本为0.14.0及以上版本,并且要求开启配置 enable_http_server_v2 = true

DDL 定义

注意:
Flink 1.13 跟其他 Flink 版本 的 DDL 参数不同,请选择对应的版本使用。

作为数据目的地 Sink(Flink 1.13)

CREATE TABLE doris_sink_table (
id INT,
name VARCHAR
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'sink.batch.size' = '500', -- 单次写BE的最大行数
'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
);

作为数据目的地 Sink

CREATE TABLE doris_sink_table (
id INT,
name VARCHAR
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password' -- 访问Doris的密码
);

-- 注意: 默认打开 2PC 两阶段提交写入

作为 Catalog

CREATE CATALOG doris_catalog WITH (
'type' = 'doris',
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'default-database' = 'default'
)

WITH 参数 (Flink 1.13 版本)

Sink

参数
说明
是否必填
备注
connector
源表类型
固定值 doris
fenodes
Doris FE HTTP 地址
-
table.identifier
Doris 表名,格式:db1.tbl1
-
username
访问 Doris 的用户名
-
password
访问 Doris 的密码
-
sink.batch.size
单次写 BE 的最大行数
默认100
sink.max-retries
写 BE 失败之后的重试次数
默认1
sink.batch.interval
flush 间隔时间,超过该时间后异步线程将缓存中数据写入 BE。默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0,表示关闭定期写入
默认1s
sink.properties.*
Stream load 的导入 参数。例如 sink.properties.column_separator' = ','
-
sink.enable-2pc
是否采用事务写入
false

Catalog

参数
说明
是否必填
备注
type
-
固定值 doris
fenodes
Doris FE HTTP 地址
-
username
访问 Doris 的用户名
-
password
访问 Doris 的密码
-
default-database
默认的database
-

WITH 参数

Sink

参数
说明
是否必填
默认值
connector
源表类型
固定值 doris
fenodes
Doris FE HTTP 地址
-
table.identifier
Doris 表名,格式:db1.tbl1
-
username
访问 Doris 的用户名
-
password
访问 Doris 的密码
-
sink.properties.*
Stream Load 的导入参数
例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,\\x01会被转换为二进制的 0x01。
JSON 格式导入
'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true',详细参数请参考 Stream load > 导入配置参数
Group Commit 模式
例如:'sink.properties.group_commit' = 'sync_mode' 设置 group commit 为同步模式。flink connector 从 1.6.2 开始支持导入配置 group commit,详细使用和限制请参考 group commit
-
sink.enable-2pc
是否开启两阶段提交 (2pc),默认为 true,保证 Exactly-Once 语义
TRUE
sink.buffer-size
写数据缓存 buffer 大小,单位字节。不建议修改,默认配置即可
1MB
sink.buffer-count
写数据缓存 buffer 个数。不建议修改,默认配置即可
3
sink.max-retries
Commit 失败后的最大重试次数,默认 3 次
3
sink.use-cache
异常时,是否使用内存缓存进行恢复,开启后缓存中会保留 Checkpoint 期间的数据
false
sink.enable.batch-mode
是否使用攒批模式写入 Doris,开启后写入时机不依赖 Checkpoint,通过 sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval 参数来控制写入时机
同时开启后将不保证 Exactly-once 语义,可借助 Uniq 模型做到幂等
false
sink.flush.queue-size
攒批模式下,缓存的队列大小。
2
sink.buffer-flush.max-rows
攒批模式下,单个批次最多写入的数据行数。
500000
sink.buffer-flush.max-bytes
攒批模式下,单个批次最多写入的字节数。
100MB
sink.buffer-flush.interval
攒批模式下,异步刷新缓存的间隔
10s
sink.ignore.update-before
是否忽略 update-before 事件,默认忽略。
true
更多参数说明请参见 Flink Doris Connector

类型映射

Doris 字段类型
Flink 字段类型
NULL_TYPE
NULL
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
TIME
DATE
DATE
DATETIME
TIMESTAMP
CHAR
STRING
LARGEINT
VARCHAR
DECIMAL
DECIMAL
DECIMALV2
HLL
Unsupported datatype

代码示例

Flink 1.13

----- Flink 1.13 -----
CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

CREATE TABLE doris_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'sink.batch.size' = '500', -- 单次写BE的最大行数
'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
);

INSERT INTO doris_sink_table select * from datagen_source_table;

Flink 1.14、 Flink 1.16、 Flink 1.18


CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

CREATE TABLE doris_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:HTTP_PORT',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);

INSERT INTO doris_sink_table select * from datagen_source_table;
CREATE CATALOG doris_catalog WITH (
'fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'default-database' = 'default'
);

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

INSERT INTO `doris_catalog`.`my_database`.`my_table` SELECT * FROM.datagen_source_table;
MySQL-CDC 对接 Doris 代码示例:
--mysql cdc 源表
CREATE TABLE `mysql_cdc_source_table` (
`id` INT NOT NULL,
`name` VARCHAR,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
'hostname' = 'YourHostName', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'YourUserName', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
'password' = 'YourPassword', -- 数据库访问的密码
'database-name' = 'YourDatabase', -- 需要同步的数据库
'table-name' = 'YourTable' -- 需要同步的数据表名
);

--写入doris表
CREATE TABLE `print_table` (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'dbName.tableName', -- Doris 表名 格式:db.tbl
'username' = 'YourUserName', -- 访问Doris的用户名,拥有库的写权限
'password' = 'YourPassword', -- 访问Doris的密码
);

insert into print_table
select id,name from mysql_cdc_source_table;

注意事项

Upsert

若需要 Upsert ,则要求 Doris 表必须是 Uniqe 模型或者 Aggregate 模型。建表示例如下:
-- Uniqe 模型建表语句
CREATE TABLE `doris_sink_table` (
`id` int(11),
`name` varchar(32)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES("replication_num" = "3");

-- Aggregate 模型建表语句
CREATE TABLE `doris_sink_table` (
`id` int(11),
`name` varchar(32) REPLACE DEFAULT '0'
)
AGGREGATE KEY('id')
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES("replication_num" = "3"); -- 注意若 BE 节点不够,会报 `Failed to find enough host in all backends` 错误,可适当减少该值。

用户权限

用户必须拥有对应的库的写权限。
CREATE USER 'test' IDENTIFIED BY 'test_passwd';
GRANT ALL ON test TO test;

更多常见问题