“深夜,又一个数据工程师正在苦苦思索数据入库的问题。"这么多数据,到底该用什么方式导入最合适?"他望着屏幕上的源源不断的数据流,陷入了沉思。 或许你也曾面临类似困扰:每日数百TB的跑批数据、日志文件等待入库、Kafka消息需要实时接入、高频小批量写入该如何提升性能...不同场景下的数据接入需求让人头疼不已。 作为一线数据摸鱼师,深知这些痛点。通过多年实践经验发现,选对工具和方法,这些"看似难题"便能迎刃而解。今天,就让我们一起探索Doris数据导入的奥秘。不仅要知其然,更要知其所以然。
"数据如何高效导入Apache Doris?"
这个问题困扰着很多数据工程师。作为一款高性能的OLAP数据库,Apache Doris提供了丰富多样的数据导入方式,能够满足不同场景下的数据接入需求。今天就带大家一起深入了解Doris的数据导入能力。
在正式开始前,让我们先通过如下图标来整体认识下Doris的数据导入体系。
Doris的数据导入方式主要分为四大类:
值得一提的是,Doris 的每个导入默认都是一个隐式事务,事务相关的更多信息可查阅Doris官网中事务章节的介绍。
接下来,主要给各位看官揭秘Stream Load、Routine Load以及Doris2.1版本最新推出的Group Commit三大主流导入方式的技术细节和最佳实践。无论您是刚接触Doris的新手,还是正在寻求导入优化的老手,相信都能在这里找到答案。
Stream Load是Doris最常用的数据导入方式,它通过HTTP协议将数据实时导入到Doris中。我们来看看它的主要流程:
Stream Load的工作流程非常清晰:
需要注意的是,Stream Load支持CSV、JSON、Parquet与ORC多种格式,单次导入数据量建议控制在10GB以下。对于超大文件,可以使用Doris官方提供的Streamloader工具进行并发导入,它提供了断点续传、自动重试等强大功能。
Stream Load 通过 HTTP 协议提交和传输。下例以 curl 工具为例,演示通过 Stream Load 提交导入Json数据的作业案例。
创建 JSON 文件 streamload_example.json。具体内容如下
[
{"userid":1,"username":"Emily","userage":25},
{"userid":2,"username":"Benjamin","userage":35},
{"userid":3,"username":"Olivia","userage":28},
{"userid":4,"username":"Alexander","userage":60},
{"userid":5,"username":"Ava","userage":17},
{"userid":6,"username":"William","userage":69},
{"userid":7,"username":"Sophia","userage":32},
{"userid":8,"username":"James","userage":64},
{"userid":9,"username":"Emma","userage":37},
{"userid":10,"username":"Liam","userage":64}
]
在 Doris 中创建被导入的表,具体语法如下:
CREATE TABLE testdb.test_streamload(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
通过 curl 命令可以提交 Stream Load 导入作业。
curl --location-trusted -u <doris_user>:<doris_password> \
-H "label:124" \
-H "Expect:100-continue" \
-H "format:json" -H "strip_outer_array:true" \
-H "jsonpaths:[\"$.userid\", \"$.username\", \"$.userage\"]" \
-H "columns:user_id,name,age" \
-T streamload_example.json \
-XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load
其中,JSON 文件内容不是 JSON Array,而是每行一个 JSON 对象,添加
Header -H "strip_outer_array:false" -H "read_json_by_line:true"
Stream Load 是一种同步导入方式,导入结果detail会直接返回给用户。
{
"TxnId": 7,
"Label": "125",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 10,
"NumberLoadedRows": 10,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 471,
"LoadTimeMs": 52,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 11,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 23,
"CommitAndPublishTimeMs": 16
}
默认情况下,Stream Load 是同步返回给 Client,所以系统模式是不记录 Stream Load 历史作业的。如果需要记录,则在 be.conf 中添加配置 enable_stream_load_record=true。具体配置可以参考 BE 配置项。
配置后,可以通过 show stream load 命令查看已完成的 Stream Load 任务。
mysql> show stream load from testdb;
+-------+--------+-----------------+---------------+---------+---------+------+-----------+------------+--------------+----------------+-----------+-------------------------+-------------------------+------+---------+
| Label | Db | Table | ClientIp | Status | Message | Url | TotalRows | LoadedRows | FilteredRows | UnselectedRows | LoadBytes | StartTime | FinishTime | User | Comment |
+-------+--------+-----------------+---------------+---------+---------+------+-----------+------------+--------------+----------------+-----------+-------------------------+-------------------------+------+---------+
| 12356 | testdb | test_streamload | 192.168.88.31 | Success | OK | N/A | 10 | 10 | 0 | 0 | 118 | 2023-11-29 08:53:00.594 | 2023-11-29 08:53:00.650 | root | |
+-------+--------+-----------------+---------------+---------+---------+------+-----------+------------+--------------+----------------+-----------+-------------------------+-------------------------+------+---------+
1 row in set (0.00 sec)
实时数据处理已成为当代企业的必备能力。Doris通过Routine Load功能,为Kafka数据接入提供了一站式解决方案。这是一个常驻服务,会持续消费Kafka中的数据并写入Doris表中。
我们来看一个典型场景:某电商平台需要实时分析用户行为数据。每当用户浏览商品、加入购物车或下单时,相关事件都会实时写入Kafka。通过Routine Load,这些数据能够被持续导入到Doris中,供实时分析使用。
Routine Load的核心优势在于支持Exactly-Once语义,确保数据不丢不重。它的执行流程是这样的:
FE节点会将常驻的导入作业拆分成多个任务,分配给不同的BE节点执行。每个任务以Stream Load的方式写入数据,任务完成后向FE汇报结果。FE根据执行情况继续生成新任务或重试失败任务。
在使用Routine Load时有几个关键点需要注意:
以导入JSON数据为例:
在 Kafka 中,有以下样本数据
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-json --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64}
在 Doris 中,创建被导入的表,具体语法如下
CREATE TABLE testdb.test_streamload(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
在 Doris 中,使用 CREATE ROUTINE LOAD 命令,创建导入作业
CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
"format"="json",
"jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
"kafka_broker_list" = "192.168.88.62:9092",
"kafka_topic" = "test-routine-load-json",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
导入后,可以通过 SHOW ROUTINE LOAD 命令查看导入作业情况,如导入目标表、导入延迟状态、导入配置信息、导入错误信息等。
如通过以下命令可以查看对应ROUTINE LOAD的任务情况:
mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load\G
*************************** 1. row ***************************
Id: 12025
Name: example_routine_load
CreateTime: 2024-01-15 08:12:42
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:testdb
TableName: test_routineload_tbl
IsMultiTable: false
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"max_batch_rows":"200000","timezone":"America/New_York","send_batch_parallelism":"1","load_to_single_tablet":"false","column_separator":"','","line_delimiter":"\n","current_concurrent_number":"1","delete":"*","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"user_id,name,age","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"csv","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"test-topic","currentKafkaPartitions":"0","brokerList":"192.168.88.62:9092"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"example_routine_load_73daf600-884e-46c0-a02b-4e49fdf3b4dc"}
Statistic: {"receivedBytes":28,"runningTxns":[],"errorRows":0,"committedTaskNum":3,"loadedRows":3,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":3,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":30069}
Progress: {"0":"2"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
User: root
Comment:
1 row in set (0.00 sec)
在Doris数据导入过程中,不同批次导入的数据都会写入内存表中,随后在磁盘中上形成一个个 RowSet 文件,每个 Rowset 文件对应一次数据导入版本。后台 Compaction 进程会自动对多个版本的 RowSet 文件进行合并,将多个 RowSet 小文件合并成 RowSet 大文件以优化查询性能以及存储空间,而每一次的 Compaction 进程都会产生对 CPU、内存以及磁盘 IO 资源的消耗。在实际数据写入场景中,写入越实时高频、生成 RowSet 版本数越高、Compaction 所消耗的资源就越大。为了避免高频写入带来的过多资源消耗甚至 OOM,Apache Doris 引入了反压机制,即在版本过多的情况下会返回 -235,并对数据的版本数量进行控制。
从 Apache Doris 2.1 版本开始,我们引入了服务端攒批 Group Commit,大幅强化了高并发、高频实时写入的能力。
顾名思义,Group Commit 会把用户侧的多次写入在 BE 端进行积攒后批量提交。对于用户而言,无需控制写入程序的频率,Doris 会自动把用户提交的多次写入在内部合并为一个版本,从而可以大幅提升用户侧的写入频次。
当前 Group Commit 已经支持同步模式 sync_mode 和异步模式 async_mode。同步模式下会将多个导入在一个事务提交,事务提交后导入返回,在导入完成后数据立即可见。异步模式下数据会先写入 WAL,Apache Doris 会根据负载和表的group_commit_interval属性异步提交数据,提交之后数据可见。为了防止 WAL 占用较大的磁盘空间,单次导入数据量较大时,会自动切换为sync_mode。
Doris分别采取 JDBC 和 Stream Load 两种方式对高并发写入场景下 Group Commit(异步模式 async_mode)的写入性能进行了测试,测试报告如下:
在动手之前,先来一起了解下Group Commit 写入有三种模式,分别是:
以INSERT INTO VALUES导入为例:
CREATE TABLE `dt` (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式
mysql> set group_commit = async_mode;
# 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit
mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99);
Query OK, 2 rows affected (0.05 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中
mysql> insert into dt(id, name) values(3, 'John');
Query OK, 1 row affected (0.01 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
# 不能立刻查询到
mysql> select * from dt;
Empty set (0.01 sec)
# 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。
mysql> select * from dt;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
+------+-------+-------+
3 rows in set (0.02 sec)
# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式
mysql> set group_commit = sync_mode;
# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。
mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99);
Query OK, 2 rows affected (10.06 sec)
{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
# 数据可以立刻读出
mysql> select * from dt;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
| 4 | Bob | 90 |
| 5 | Alice | 99 |
+------+-------+-------+
5 rows in set (0.03 sec)
mysql> set group_commit = off_mode;
数据导入看似简单,实则暗藏玄机。好在Doris提供了丰富的导入方案,只要了解各自的特点和适用场景,就能轻松应对各种数据接入需求。
如果要问"Doris最佳导入方式"是什么?
答案是:不存在放之四海而皆准的方案,关键在于结合业务场景选择合适的导入策略。就像中国功夫讲究"无招胜有招",掌握了这些导入方式的精髓,才能做到"得心应手"。
下期我们将深入探讨Doris其它更有趣有用有价值的内容,敬请期待!