前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >3分钟搞懂Arrow Flight SQL,让数据传输提速100倍的秘密

3分钟搞懂Arrow Flight SQL,让数据传输提速100倍的秘密

作者头像
一臻数据
发布2025-01-20 20:25:08
发布2025-01-20 20:25:08
10100
代码可运行
举报
文章被收录于专栏:一臻数据一臻数据
运行总次数:0
代码可运行

此时,数据分析师小华揉着发酸的眼睛,望着电脑屏幕发呆。 他忍不住抱怨道:"这数据导出也太慢了吧!" 是的,又一次等待MySQL协议传输大批量数据,这感觉像是用吸管在喝一桶水,得喝到什么时候才能见底? 就在小华等待传输过程中,翻查着Doris官方文档,发现了个能提升数据传输效率 100 倍的神器 - Arrow Flight SQL! 传统的数据传输像是在玩"倒水"游戏,要经过好几道工序;而Arrow Flight SQL直接架起了一条高速公路,让数据畅通无阻地奔跑。 接下来,追随着小华的故事,3分钟搞懂Arrow Flight SQL!

数据传输提速100倍!

数据分析师小华最近遇到了一个棘手的问题。

他需要从Apache Doris中读取海量数据进行实时分析,传统的MySQL协议每次查询都要经过繁琐的序列化和反序列化过程,速度慢得让人抓狂。

"有没有更快的数据传输方案?" 小华苦恼地挠着头。

于是,小华在Doris官方不停地翻阅着...终于找到Apache Doris 2.1版本带来了革命性的突破 - Arrow Flight SQL高速数据链路

这个基于Apache Arrow的解决方案:数据传输性能相较于 MySQL 协议提升了惊人的100倍!

如何做到100倍提升?

小华自言自语喃喃道:"你知道传统的MySQL协议数据传输有多慢吗?"

好比是把一桶水倒进另一个桶里,还要经过一个漏斗。数据从Doris的列存格式要先转成MySQL的行存格式,再由客户端转回列存格式,这个过程浪费了大量时间。

Arrow Flight SQL则完全不同。它直接架设起一条直通管道,数据从Doris直接以Arrow列存格式传输到客户端,无需任何转换。这种近乎零拷贝的传输方式让数据传输效率提升了近100倍!

如上图所示,在 Doris 中查询结果以列存格式的 Block 组织。在 2.1 以前版本,可以通过 MySQL Client 或 JDBC/ODBC 驱动传输至目标客户端,需要将行存格式的 Bytes 再反序列化为列存格式。

基于 Arrow Flight SQL 构建高速数据传输链路,若目标客户端同样支持 Arrow 列存格式,整体传输过程将完全避免序列化/反序列化操作,彻底消除因此带来时间及性能损耗

更令人amazing的是,Arrow Flight SQL还支持多节点并行传输,充分利用现代硬件的多核优势。对于数据科学家和分析师来说,这意味着可以在几秒钟内获取海量数据进行分析,大大提升了工作效率!

让数据传输起飞!

了解完基本原理后,小华迫不及待地想试试这个"神器"。

实际上,在Python和Java中使用Arrow Flight SQL非常简单。

在Python中,只需几步就能快速建立高速数据通道:

代码语言:javascript
代码运行次数:0
复制
# Doris Arrow Flight SQL Test

# step 1, library is released on PyPI and can be easily installed.
# pip install adbc_driver_manager
# pip install adbc_driver_flightsql
import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql

# step 2, create a client that interacts with the Doris Arrow Flight SQL service.
# Modify arrow_flight_sql_port in fe/conf/fe.conf to an available port, such as 9090.
# Modify arrow_flight_sql_port in be/conf/be.conf to an available port, such as 9091.
conn = flight_sql.connect(uri="grpc://{FE_HOST}:{fe.conf:arrow_flight_sql_port}", db_kwargs={
            adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
            adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
        })
cursor = conn.cursor()

# interacting with Doris via SQL using Cursor
def execute(sql):
    print("\n### execute query: ###\n " + sql)
    cursor.execute(sql)
    print("### result: ###")
    print(cursor.fetchallarrow().to_pandas())

# step3, execute DDL statements, create database/table, show stmt.
execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;")
execute("show databases;")
execute("create database arrow_flight_sql;")
execute("show databases;")
execute("use arrow_flight_sql;")
execute("""CREATE TABLE arrow_flight_sql_test
    (
         k0 INT,
         k1 DOUBLE,
         K2 varchar(32) NULL DEFAULT "" COMMENT "",
         k3 DECIMAL(27,9) DEFAULT "0",
         k4 BIGINT NULL DEFAULT '10',
         k5 DATE,
    )
    DISTRIBUTED BY HASH(k5) BUCKETS 5
    PROPERTIES("replication_num" = "1");""")
execute("show create table arrow_flight_sql_test;")


# step4, insert into
execute("""INSERT INTO arrow_flight_sql_test VALUES
        ('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'),
        ('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'),
        ('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'),
        ('3', 4, "ID", 4, 4, '2023-10-22'),
        ('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""")


# step5, execute queries, aggregation, sort, set session variable
execute("select * from arrow_flight_sql_test order by k0;")
execute("set exec_mem_limit=2000;")
execute("show variables like \"%exec_mem_limit%\";")
execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;")

# step6, close cursor 
cursor.close()

对Java开发者来说,Arrow Flight SQL同样提供了优雅的解决方案。你可以选择JDBC风格的API:

代码语言:javascript
代码运行次数:0
复制
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");
String DB_URL = "jdbc:arrow-flight-sql://{FE_HOST}:{fe.conf:arrow_flight_sql_port}?useServerPrepStmts=false"
        + "&cachePrepStmts=true&useSSL=false&useEncryption=false";
String USER = "root";
String PASS = "";

Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
Statement stmt = conn.createStatement();
ResultSet resultSet = stmt.executeQuery("select * from information_schema.tables;");
while (resultSet.next()) {
    System.out.println(resultSet.toString());
}

resultSet.close();
stmt.close();
conn.close();

经过一番测试体验之后,小华在实践中总结出了几个提升性能的关键技巧:

1. 智能批处理

默认的批处理大小是1024行,可以根据实际场景调整对应值:setTargetBatchSize。

对于内存充足的环境,适当增大批处理大小能显著提升吞吐量。

2. 并行加速

Java开发者可以使用FlightClient实现多Endpoint并行读取,更加灵活地利用集群资源。

一个典型的优化是:

代码语言:javascript
代码运行次数:0
复制
FlightClient client = FlightClient.builder()
            .setHost("localhost")
            .setPort(8080)
            .build();

3. 列式计算

保持数据在Arrow格式下进行计算,避免不必要的格式转换。

Python用户可以直接使用pandas进行高效的列式计算:

代码语言:javascript
代码运行次数:0
复制
cursor.fetchallarrow().to_pandas()
...

小结

回到故事的开头,小华用Arrow Flight SQL重构了数据分析流程,查询速度提升了近百倍,内存占用也大幅下降。老板看到这个改进非常满意,还给他加薪升职!

从这件事上,小华也悟出:

技术创新不仅能解决实际问题,还能为个人带来职业发展机会。Arrow Flight SQL好比是给数据插上了翅膀,让数据分析工作真"飞"起来了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-01-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一臻数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据传输提速100倍!
  • 如何做到100倍提升?
  • 让数据传输起飞!
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档