前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >为什么越来越多的大厂都在用这个实时数仓!Cover全场景导入方式,这个神器是真香

为什么越来越多的大厂都在用这个实时数仓!Cover全场景导入方式,这个神器是真香

作者头像
一臻数据
发布2024-12-24 16:16:27
发布2024-12-24 16:16:27
9600
代码可运行
举报
文章被收录于专栏:一臻数据一臻数据
运行总次数:0
代码可运行

深夜,又一个数据工程师正在苦苦思索数据入库的问题。"这么多数据,到底该用什么方式导入最合适?"他望着屏幕上的源源不断的数据流,陷入了沉思。 或许你也曾面临类似困扰:每日数百TB的跑批数据、日志文件等待入库、Kafka消息需要实时接入、高频小批量写入该如何提升性能...不同场景下的数据接入需求让人头疼不已。 作为一线数据摸鱼师,深知这些痛点。通过多年实践经验发现,选对工具和方法,这些"看似难题"便能迎刃而解。今天,就让我们一起探索Doris数据导入的奥秘。不仅要知其然,更要知其所以然。

Doris数据导入全攻略:从入门到实践

"数据如何高效导入Apache Doris?"

这个问题困扰着很多数据工程师。作为一款高性能的OLAP数据库,Apache Doris提供了丰富多样的数据导入方式,能够满足不同场景下的数据接入需求。今天就带大家一起深入了解Doris的数据导入能力。

在正式开始前,让我们先通过如下图标来整体认识下Doris的数据导入体系。

Doris的数据导入方式主要分为四大类:

  • 实时写入:应用程序通过 HTTP 或者 JDBC 实时写入数据到 Doris 表中,适用于需要实时分析和查询的场景。
    • 极少量数据(5 分钟一次)时可以使用 JDBC INSERT 写入数据。
    • 并发较高或者频次较高(大于 20 并发或者 1 分钟写入多次)时建议打开 Group Commit,使用 JDBC INSERT 或者 Stream Load 写入数据。
    • 吞吐较高时推荐使用 Stream Load 通过 HTTP 写入数据。
  • 流式同步:通过实时数据流(如 Flink、Kafka、事务数据库)将数据实时导入到 Doris 表中,适用于需要实时分析和查询的场景。
    • 可以使用 Flink Doris Connector 将 Flink 的实时数据流写入到 Doris 表中。
    • 可以使用 Routine Load 或者 Doris Kafka Connector 将 Kafka 的实时数据流写入到 Doris 表中。Routine Load 方式下,Doris 会调度任务将 Kafka 中的数据拉取并写入 Doris 中,目前支持 csv 和 json 格式的数据。Kafka Connector 方式下,由 Kafka 将数据写入到 Doris 中,支持 avro、json、csv、protobuf 格式的数据。
    • 可以使用 Flink CDC 或 Datax 将事务数据库的 CDC 数据流写入到 Doris 中。
  • 批量导入:将数据从外部存储系统(如 S3、HDFS、本地文件、NAS)批量加载到 Doris 表中,适用于非实时数据导入的需求。
    • 可以使用 Broker Load 将 S3 和 HDFS 中的文件写入到 Doris 中。
    • 可以使用 INSERT INTO SELECT 将 S3、HDFS 和 NAS 中的文件同步写入到 Doris 中,配合 JOB 可以异步写入。
    • 可以使用 Stream Load 或者 Doris Streamloader 将本地文件写入 Doris 中。
  • 外部数据源集成:通过与外部数据源(如 Hive、JDBC、Iceberg 等)的集成,实现对外部数据的查询和部分数据导入到 Doris 表中。
    • 可以创建 Catalog 读取外部数据源中的数据,使用 INSERT INTO SELECT 将外部数据源中的数据同步写入到 Doris 中,配合 JOB 可以异步写入。
    • 可以使用 X2Doris 将其他 AP 系统的数据迁移到 Doris 中。

值得一提的是,Doris 的每个导入默认都是一个隐式事务,事务相关的更多信息可查阅Doris官网中事务章节的介绍。

接下来,主要给各位看官揭秘Stream Load、Routine Load以及Doris2.1版本最新推出的Group Commit三大主流导入方式的技术细节和最佳实践。无论您是刚接触Doris的新手,还是正在寻求导入优化的老手,相信都能在这里找到答案。

Stream Load:高效可靠的通用导入利器

Stream Load是Doris最常用的数据导入方式,它通过HTTP协议将数据实时导入到Doris中。我们来看看它的主要流程:

Stream Load的工作流程非常清晰:

  1. Client 向 FE 提交 Stream Load 导入作业请求
  2. FE 会轮询选择一台 BE 作为 Coordinator 节点,负责导入作业调度,然后返回给 Client 一个 HTTP 重定向
  3. Client 连接 Coordinator BE 节点,提交导入请求
  4. Coordinator BE 会分发数据给相应 BE 节点,导入完成后会返回导入结果给 Client
  5. Client 也可以直接通过指定 BE 节点作为 Coordinator,直接分发导入作业

需要注意的是,Stream Load支持CSV、JSON、Parquet与ORC多种格式,单次导入数据量建议控制在10GB以下。对于超大文件,可以使用Doris官方提供的Streamloader工具进行并发导入,它提供了断点续传、自动重试等强大功能。

快速上手Demo

Stream Load 通过 HTTP 协议提交和传输。下例以 curl 工具为例,演示通过 Stream Load 提交导入Json数据的作业案例。

1.创建导入数据

创建 JSON 文件 streamload_example.json。具体内容如下

代码语言:javascript
代码运行次数:0
复制
[
{"userid":1,"username":"Emily","userage":25},
{"userid":2,"username":"Benjamin","userage":35},
{"userid":3,"username":"Olivia","userage":28},
{"userid":4,"username":"Alexander","userage":60},
{"userid":5,"username":"Ava","userage":17},
{"userid":6,"username":"William","userage":69},
{"userid":7,"username":"Sophia","userage":32},
{"userid":8,"username":"James","userage":64},
{"userid":9,"username":"Emma","userage":37},
{"userid":10,"username":"Liam","userage":64}
]

2.创建导入 Doris 表

在 Doris 中创建被导入的表,具体语法如下:

代码语言:javascript
代码运行次数:0
复制
CREATE TABLE testdb.test_streamload(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

3.导入作业

通过 curl 命令可以提交 Stream Load 导入作业。

代码语言:javascript
代码运行次数:0
复制
curl --location-trusted -u <doris_user>:<doris_password> \
    -H "label:124" \
    -H "Expect:100-continue" \
    -H "format:json" -H "strip_outer_array:true" \
    -H "jsonpaths:[\"$.userid\", \"$.username\", \"$.userage\"]" \
    -H "columns:user_id,name,age" \
    -T streamload_example.json \
    -XPUT http://<fe_ip>:<fe_http_port>/api/testdb/test_streamload/_stream_load

其中,JSON 文件内容不是 JSON Array,而是每行一个 JSON 对象,添加

代码语言:javascript
代码运行次数:0
复制
Header -H "strip_outer_array:false" -H "read_json_by_line:true"

Stream Load 是一种同步导入方式,导入结果detail会直接返回给用户。

代码语言:javascript
代码运行次数:0
复制
{
    "TxnId": 7,
    "Label": "125",
    "Comment": "",
    "TwoPhaseCommit": "false",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 10,
    "NumberLoadedRows": 10,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 471,
    "LoadTimeMs": 52,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 11,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 23,
    "CommitAndPublishTimeMs": 16
}

默认情况下,Stream Load 是同步返回给 Client,所以系统模式是不记录 Stream Load 历史作业的。如果需要记录,则在 be.conf 中添加配置 enable_stream_load_record=true。具体配置可以参考 BE 配置项。

配置后,可以通过 show stream load 命令查看已完成的 Stream Load 任务。

代码语言:javascript
代码运行次数:0
复制
mysql> show stream load from testdb;
+-------+--------+-----------------+---------------+---------+---------+------+-----------+------------+--------------+----------------+-----------+-------------------------+-------------------------+------+---------+
| Label | Db     | Table           | ClientIp      | Status  | Message | Url  | TotalRows | LoadedRows | FilteredRows | UnselectedRows | LoadBytes | StartTime               | FinishTime              | User | Comment |
+-------+--------+-----------------+---------------+---------+---------+------+-----------+------------+--------------+----------------+-----------+-------------------------+-------------------------+------+---------+
| 12356 | testdb | test_streamload | 192.168.88.31 | Success | OK      | N/A  | 10        | 10         | 0            | 0              | 118       | 2023-11-29 08:53:00.594 | 2023-11-29 08:53:00.650 | root |         |
+-------+--------+-----------------+---------------+---------+---------+------+-----------+------------+--------------+----------------+-----------+-------------------------+-------------------------+------+---------+
1 row in set (0.00 sec)

Routine Load:Kafka数据实时入库的最佳搭档

实时数据处理已成为当代企业的必备能力。Doris通过Routine Load功能,为Kafka数据接入提供了一站式解决方案。这是一个常驻服务,会持续消费Kafka中的数据并写入Doris表中。

我们来看一个典型场景:某电商平台需要实时分析用户行为数据。每当用户浏览商品、加入购物车或下单时,相关事件都会实时写入Kafka。通过Routine Load,这些数据能够被持续导入到Doris中,供实时分析使用。

Routine Load的核心优势在于支持Exactly-Once语义,确保数据不丢不重。它的执行流程是这样的:

  1. Client 向 FE 提交 Routine Load 常驻 Routine Load Job
  2. FE 通过 Job Scheduler 将 Routine Load Job 拆分成若干个 Routine Load Task
  3. 在 BE 上,一个 Routine Load Task 会被视为 Stream Load 任务进行导入,导入完成后向 FE 汇报
  4. FE 中的 Job Scheduler 根据汇报结果,继续生成新的 Task,或对失败的 Task 进行重试
  5. Routine Load Job 会不断产生新的 Task,来完成数据的不间断导入

FE节点会将常驻的导入作业拆分成多个任务,分配给不同的BE节点执行。每个任务以Stream Load的方式写入数据,任务完成后向FE汇报结果。FE根据执行情况继续生成新任务或重试失败任务。

在使用Routine Load时有几个关键点需要注意:

  • 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群;
  • 支持的消息格式为 CSV 及 JSON 文本格式。CSV 每一个 message 为一行,且行尾不包含换行符;
  • 默认支持 Kafka 0.10.0.0(含)以上版本。

快速上手Demo

以导入JSON数据为例:

1.导入样本数据

在 Kafka 中,有以下样本数据

代码语言:javascript
代码运行次数:0
复制
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-json --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64}

2.创建需要导入的表

在 Doris 中,创建被导入的表,具体语法如下

代码语言:javascript
代码运行次数:0
复制
CREATE TABLE testdb.test_streamload(
    user_id            BIGINT       NOT NULL COMMENT "用户 ID",
    name               VARCHAR(20)           COMMENT "用户姓名",
    age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

3.创建 Routine Load 导入作业

在 Doris 中,使用 CREATE ROUTINE LOAD 命令,创建导入作业

代码语言:javascript
代码运行次数:0
复制
CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
    "format"="json",
    "jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "192.168.88.62:9092",
    "kafka_topic" = "test-routine-load-json",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

导入后,可以通过 SHOW ROUTINE LOAD 命令查看导入作业情况,如导入目标表、导入延迟状态、导入配置信息、导入错误信息等。

如通过以下命令可以查看对应ROUTINE LOAD的任务情况:

代码语言:javascript
代码运行次数:0
复制
mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load\G
*************************** 1. row ***************************
                  Id: 12025
                Name: example_routine_load
          CreateTime: 2024-01-15 08:12:42
           PauseTime: NULL
             EndTime: NULL
              DbName: default_cluster:testdb
           TableName: test_routineload_tbl
        IsMultiTable: false
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"max_batch_rows":"200000","timezone":"America/New_York","send_batch_parallelism":"1","load_to_single_tablet":"false","column_separator":"','","line_delimiter":"\n","current_concurrent_number":"1","delete":"*","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"user_id,name,age","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"csv","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"test-topic","currentKafkaPartitions":"0","brokerList":"192.168.88.62:9092"}
    CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"example_routine_load_73daf600-884e-46c0-a02b-4e49fdf3b4dc"}
           Statistic: {"receivedBytes":28,"runningTxns":[],"errorRows":0,"committedTaskNum":3,"loadedRows":3,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":3,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":30069}
            Progress: {"0":"2"}
                 Lag: {"0":0}
ReasonOfStateChanged:
        ErrorLogUrls:
            OtherMsg:
                User: root
             Comment:
1 row in set (0.00 sec)

Group Commit:高频写入的性能倍增器

在Doris数据导入过程中,不同批次导入的数据都会写入内存表中,随后在磁盘中上形成一个个 RowSet 文件,每个 Rowset 文件对应一次数据导入版本。后台 Compaction 进程会自动对多个版本的 RowSet 文件进行合并,将多个 RowSet 小文件合并成 RowSet 大文件以优化查询性能以及存储空间,而每一次的 Compaction 进程都会产生对 CPU、内存以及磁盘 IO 资源的消耗。在实际数据写入场景中,写入越实时高频、生成 RowSet 版本数越高、Compaction 所消耗的资源就越大。为了避免高频写入带来的过多资源消耗甚至 OOM,Apache Doris 引入了反压机制,即在版本过多的情况下会返回 -235,并对数据的版本数量进行控制。

Apache Doris 2.1 版本开始,我们引入了服务端攒批 Group Commit,大幅强化了高并发、高频实时写入的能力。

顾名思义,Group Commit 会把用户侧的多次写入在 BE 端进行积攒后批量提交。对于用户而言,无需控制写入程序的频率,Doris 会自动把用户提交的多次写入在内部合并为一个版本,从而可以大幅提升用户侧的写入频次。

当前 Group Commit 已经支持同步模式 sync_mode 和异步模式 async_mode。同步模式下会将多个导入在一个事务提交,事务提交后导入返回,在导入完成后数据立即可见。异步模式下数据会先写入 WAL,Apache Doris 会根据负载和表的group_commit_interval属性异步提交数据,提交之后数据可见。为了防止 WAL 占用较大的磁盘空间,单次导入数据量较大时,会自动切换为sync_mode。

Doris分别采取 JDBC 和 Stream Load 两种方式对高并发写入场景下 Group Commit(异步模式 async_mode)的写入性能进行了测试,测试报告如下:

  • JDBC 写入
    • 集群配置为 1FE 1BE,数据集为 TPC-H SF10 Lineitem 表,总共约 22GB、1.8 亿行;
    • 经测试,在并发数 20、单次 Insert 数据行数 100 行下,导入效率达到 10.69w 行/秒、导入吞吐达 11.46 MB/秒,BE 节点的 CPU 使用率稳定保持在 10%-20%;
  • Stream Load 写入
    • 集群配置为 1FE 3BE,数据集为 httplogs、总共 31GB、2.47 亿行。在未开启 Group Commit 和 开启 Group Commit 的异步模式时,通过设置不同的单并发数据量和并发数,对比数据的写入性能。
    • 经测试,在并发数 10、单次导入数据量 1 MB 下,未开启 Group Commit 时会提示 -235 错误,开启后可稳定运行且导入效率达 81w 行/秒、导入吞吐达 104 MB/秒;在并发数 10、单次导入数据量 10MB 下,开启 Group Commit 后耗时降低至原先的 55%、导入吞吐提升 79%;

快速上手Demo

在动手之前,先来一起了解下Group Commit 写入有三种模式,分别是:

  • 关闭模式(off_mode):不开启 Group Commit,保持以上三种导入方式的默认行为。
  • 同步模式(sync_mode):Doris 根据负载和表的 group_commit_interval属性将多个导入在一个事务提交,事务提交后导入返回。这适用于高并发写入场景,且在导入完成后要求数据立即可见。
  • 异步模式(async_mode):Doris 首先将数据写入 WAL (Write Ahead Log),然后导入立即返回。Doris 会根据负载和表的group_commit_interval属性异步提交数据,提交之后数据可见。为了防止 WAL 占用较大的磁盘空间,单次导入数据量较大时,会自动切换为sync_mode。这适用于写入延迟敏感以及高频写入的场景。

以INSERT INTO VALUES导入为例:

1.测试表创建

代码语言:javascript
代码运行次数:0
复制
CREATE TABLE `dt` (
    `id` int(11) NOT NULL,
    `name` varchar(50) NULL,
    `score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
    "replication_num" = "1"
);

2.异步模式

代码语言:javascript
代码运行次数:0
复制
# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式
mysql> set group_commit = async_mode;

# 这里返回的 label 是 group_commit 开头的,可以区分出是否使用了 group commit
mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99);
Query OK, 2 rows affected (0.05 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}

# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中
mysql> insert into dt(id, name) values(3, 'John');
Query OK, 1 row affected (0.01 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}

# 不能立刻查询到
mysql> select * from dt;
Empty set (0.01 sec)

# 10 秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。
mysql> select * from dt;
+------+-------+-------+
| id   | name  | score |
+------+-------+-------+
|    1 | Bob   |    90 |
|    2 | Alice |    99 |
|    3 | John  |  NULL |
+------+-------+-------+
3 rows in set (0.02 sec)

3.同步模式

代码语言:javascript
代码运行次数:0
复制
# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式
mysql> set group_commit = sync_mode;

# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。
mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99);
Query OK, 2 rows affected (10.06 sec)
{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}

# 数据可以立刻读出
mysql> select * from dt;
+------+-------+-------+
| id   | name  | score |
+------+-------+-------+
|    1 | Bob   |    90 |
|    2 | Alice |    99 |
|    3 | John  |  NULL |
|    4 | Bob   |    90 |
|    5 | Alice |    99 |
+------+-------+-------+
5 rows in set (0.03 sec)

4.关闭模式

代码语言:javascript
代码运行次数:0
复制
mysql> set group_commit = off_mode;

小结

数据导入看似简单,实则暗藏玄机。好在Doris提供了丰富的导入方案,只要了解各自的特点和适用场景,就能轻松应对各种数据接入需求。

如果要问"Doris最佳导入方式"是什么?

答案是:不存在放之四海而皆准的方案,关键在于结合业务场景选择合适的导入策略。就像中国功夫讲究"无招胜有招",掌握了这些导入方式的精髓,才能做到"得心应手"。

下期我们将深入探讨Doris其它更有趣有用有价值的内容,敬请期待!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一臻数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Doris数据导入全攻略:从入门到实践
  • Stream Load:高效可靠的通用导入利器
    • 快速上手Demo
      • 1.创建导入数据
      • 2.创建导入 Doris 表
      • 3.导入作业
  • Routine Load:Kafka数据实时入库的最佳搭档
    • 快速上手Demo
      • 1.导入样本数据
      • 2.创建需要导入的表
      • 3.创建 Routine Load 导入作业
  • Group Commit:高频写入的性能倍增器
    • 快速上手Demo
      • 1.测试表创建
      • 2.异步模式
      • 3.同步模式
      • 4.关闭模式
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档