前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >数据查询太慢?这个Doris JOIN秘笈让你的SQL提速千百倍!

数据查询太慢?这个Doris JOIN秘笈让你的SQL提速千百倍!

作者头像
一臻数据
发布2024-12-24 16:19:18
发布2024-12-24 16:19:18
18400
代码可运行
举报
文章被收录于专栏:一臻数据一臻数据
运行总次数:0
代码可运行

数据分析这一行,"慢查询"就像职场痛点一样挥之不去。 最近遇到不少分析师小伙伴,动不动抱怨查询跑了好几小时还没结果,只能对着转圈圈的进度条干着急。这不,上周我遇到一位老朋友,他就在为一个大表JOIN性能发愁。 "查询速度比蜗牛还慢,老板都催疯了..."他一脸苦相地说。 作为一名多年摸爬滚打在数据库优化一线的魔芋师,我忍不住笑了:"JOIN慢,那是你还不懂它的脾气秉性。就像武林高手过招,懂得借力使力,方能四两拨千斤。" 今天,基于Doris一起来扒一扒数据JOIN的那些事,看看如何让你的查询化身"闪电侠",让老板对你刮目相看!

Doris JOIN 秘笈:性能优化从选对 JOIN 策略开始

数据分析师小张最近遇到了一个棘手的问题。他正在处理一个大规模数据分析任务,需要对几张超大表进行关联分析。起初他用了最常规的JOIN方式,结果查询速度慢得令人发指,一个查询要跑好几个小时。这可让他犯了难 - 老板要的报表迟迟出不来,催得他焦头烂额。

小张找到了他的老朋友、Doris专家老李诉苦。老李笑着说:"Join性能慢,选对Join策略是关键。Doris支持多种Join实现方式,我来给你'传授秘笈'。"

JOIN的本质

在分布式数据库中,JOIN操作看似简单,实则暗藏玄机。它不仅要完成表的关联,还要在分布式环境下协调数据的流转和计算。

例如,你有两张大表分布在不同的节点上,要完成JOIN,就必须解决一个核心问题 - 如何把要关联的数据凑到一起?这就涉及到了数据的重分布策略。

Doris的JOIN武器库

Doris在JOIN物理实现方式采用了两大主力 - Hash Join和Nest Loop Join。Hash Join就像武林高手的快剑,以迅雷不及掩耳之势完成关联;Nest Loop Join则如大侠的基本功,招式朴实无华但适用场景广泛。

Hash Join通过在内存中构建哈希表,能快速完成等值JOIN操作。好比打王者荣耀时快速找队友的场景 - 系统会给每个玩家分配一个唯一ID,需要时直接通过ID定位,效率自然高。

Nest Loop Join则采用最朴实的方式 - 遍历。它就像春节走亲戚,挨家挨户地拜访,虽然慢一些,但保证不会漏掉任何一家。这种方式适用于所有JOIN场景,包括非等值JOIN。

四大数据分布策略的江湖

作为分布式 MPP 数据库,Apache Doris 在 Hash Join 过程中需要进行数据的 Shuffle,进行拆分调度,以确保 JOIN 结果的正确性。

老李拿出一张图,向小张展示了Doris中JOIN操作的四种数据分布策略:

Broadcast Join - 霸王级选手

Broadcast Join似一位霸道总裁,将右表数据强势复制到每个计算节点。简单粗暴,适用场景广泛。当右表数据量较小时,这种方式简单高效。网络开销随节点数量线性增长。

如图所示,Broadcast Join 的过程涉及将右表的所有数据发送到所有参与 Join 计算的节点,包括左表数据的扫描节点,而左表数据则保持不动。这一过程中,每个节点都会接收到右表的完整数据副本(总量为 T(R) 的数据),以确保所有节点都具备执行 Join 操作所需的数据。

该方法适用于多种通用场景,但不适用于 RIGHT OUTER, RIGHT ANTI, 和 RIGHT SEMI 类型的 Hash Join。其网络开销为 Join 的节点数 N 乘以右表数据量 T(R)。

Partition Shuffle - 中规中矩派

Partition Shuffle采用双向分发策略,将两张表按JOIN键哈希分布。网络开销等于两表数据量之和。这种方式像太极,讲究平衡,适合两表数据量相当的场景。

此方式通过 JOIN 条件计算 Hash 值并进行分桶。具体来说,左右表的数据会根据 JOIN 条件计算得到的 Hash 值进行分区,然后这些分区数据被发送到相应的分区节点上(如上图所示)。

该方法的网络开销主要包括两个部分:传输左表数据 T(S) 所需的开销和传输右表数据 T(R) 所需的开销。该方法的仅支持 Hash Join 操作,因为它依赖于 JOIN 条件来执行数据的分桶操作。

Bucket Shuffle - 精打细算派

Bucket Shuffle利用表的分桶特性,只需重分布右表数据。网络开销仅为右表数据量。这就像武林高手借力打力,善用对方优势。当左表已按JOIN键分桶时特别高效。

即JOIN 条件包含左表的分桶列时,保持左表数据不动,将右表数据分发到左表节点进行 JOIN,减少网络开销。

当参与 Join 操作的某一侧表的数据已经按照 Join 条件列进行了 Hash 分布时,我们可以选择保持这一侧的数据位置不变,而将另一侧的数据依据相同的 Join 条件列,相同的 Hash 分布计算进行数据分发。(这里提到的“表”不仅限于物理存储的表,还可以是 SQL 查询中任意算子的输出结果,并且可以灵活选择保持左表或右表的数据位置不变,而只移动并分发另一侧的表。)

以 Doris 的物理表为例,由于其表数据本身就是通过 Hash 计算进行分桶存储,因此可以直接利用这一特性来优化 Join 操作的数据 Shuffle 过程。假设我们有两张表需要进行 Join,且 Join 列是左表的分桶列,那么在这种情况下,我们无需移动左表的数据,只需根据左表的分桶信息将右表的数据分发到相应的位置,即可完成 Join 计算(如上图所示)。

此过程的网络开销主要来自于右表数据的移动,即 T(R)。

Colocate Join - 绝顶高手

Colocate Join是终极优化,数据提前按同样方式分布,JOIN时无需移动数据。这就像心有灵犀的搭档,配合天衣无缝。零网络开销,性能最优,不过要求最严格。

与 Bucket Shuffle Join 相似,如果参与 Join 的两侧的表,刚好是按照 Join 条件列进行计算的 Hash 分布,那么可以跳过 Shuffle 过程,直接在本地进行 Join 计算。以下通过物理表进行简单说明:

当 Doris 在建表时指定为 DISTRIBUTED BY HASH,那么在数据导入时,系统会根据 Hash 分布键进行数据分发。如果两张表的 Hash 分布键恰好与 Join 条件列一致,那么可以认为这两张表的数据已经按照 Join 的需求进行了预分布,即无需额外的 Shuffle 操作。因此,在实际查询时,可以直接在这两张表上执行 Join 计算。

Colocate Join 示例

介绍Doris的四种Hash Join后,挑个 Colocate Join 绝顶高手来比试一番:

在下面的例子中,t1 和 t2 表都通过 GROUP BY 算子进行了处理,并输出了新的表(此时 tx 和 ty 均按照 c2 进行了 Hash 分布)。随后的 JOIN 条件是 tx.c2 = ty.c2,这恰好满足了 Colocate Join 的条件。

代码语言:javascript
代码运行次数:0
复制
explain select *
from 
    (
        -- t1 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
        select c2 as c2, sum(c1) as c1
        from t1
        group by c2 
    ) tx
join 
    (
        -- t2 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
        select c2 as c2, sum(c1) as c1
        from t2
        group by c2 
    ) ty
on tx.c2 = ty.c2;

从下面的 Explain 执行计划结果中可以看出,8 号 Hash Join 节点的左侧子节点是 7 号聚合节点,右侧子节点是 3 号聚合节点,并且没有出现 Exchange 节点。这表明左侧和右侧子节点聚合后的数据都保持在其原始位置不动,无需进行数据移动,可以直接在本地进行后续的 Hash Join 操作。

代码语言:javascript
代码运行次数:0
复制
+------------------------------------------------------------+
| Explain String(Nereids Planner)                            |
+------------------------------------------------------------+
| PLAN FRAGMENT 0                                            |
|   OUTPUT EXPRS:                                            |
|     c2[#20]                                                |
|     c1[#21]                                                |
|     c2[#22]                                                |
|     c1[#23]                                                |
|   PARTITION: HASH_PARTITIONED: c2[#10]                     |
|                                                            |
|   HAS_COLO_PLAN_NODE: true                                 |
|                                                            |
|   VRESULT SINK                                             |
|      MYSQL_PROTOCAL                                        |
|                                                            |
|   8:VHASH JOIN(373)                                        |
|   |  join op: INNER JOIN(PARTITIONED)[]                    |
|   |  equal join conjunct: (c2[#14] = c2[#6])               |
|   |  cardinality=10                                        |
|   |  vec output tuple id: 9                                |
|   |  output tuple id: 9                                    |
|   |  vIntermediate tuple ids: 8                            |
|   |  hash output slot ids: 6 7 14 15                       |
|   |  final projections: c2[#16], c1[#17], c2[#18], c1[#19] |
|   |  final project output tuple id: 9                      |
|   |  distribute expr lists: c2[#14]                        |
|   |  distribute expr lists: c2[#6]                         |
|   |                                                        |
|   |----3:VAGGREGATE (merge finalize)(367)                  |
|   |    |  output: sum(partial_sum(c1)[#3])[#5]             |
|   |    |  group by: c2[#2]                                 |
|   |    |  sortByGroupKey:false                             |
|   |    |  cardinality=5                                    |
|   |    |  final projections: c2[#4], c1[#5]                |
|   |    |  final project output tuple id: 3                 |
|   |    |  distribute expr lists: c2[#2]                    |
|   |    |                                                   |
|   |    2:VEXCHANGE                                         |
|   |       offset: 0                                        |
|   |       distribute expr lists:                           |
|   |                                                        |
|   7:VAGGREGATE (merge finalize)(354)                       |
|   |  output: sum(partial_sum(c1)[#11])[#13]                |
|   |  group by: c2[#10]                                     |
|   |  sortByGroupKey:false                                  |
|   |  cardinality=10                                        |
|   |  final projections: c2[#12], c1[#13]                   |
|   |  final project output tuple id: 7                      |
|   |  distribute expr lists: c2[#10]                        |
|   |                                                        |
|   6:VEXCHANGE                                              |
|      offset: 0                                             |
|      distribute expr lists:                                |
|                                                            |
| PLAN FRAGMENT 1                                            |
|                                                            |
|   PARTITION: HASH_PARTITIONED: c1[#8]                      |
|                                                            |
|   HAS_COLO_PLAN_NODE: false                                |
|                                                            |
|   STREAM DATA SINK                                         |
|     EXCHANGE ID: 06                                        |
|     HASH_PARTITIONED: c2[#10]                              |
|                                                            |
|   5:VAGGREGATE (update serialize)(348)                     |
|   |  STREAMING                                             |
|   |  output: partial_sum(c1[#8])[#11]                      |
|   |  group by: c2[#9]                                      |
|   |  sortByGroupKey:false                                  |
|   |  cardinality=10                                        |
|   |  distribute expr lists: c1[#8]                         |
|   |                                                        |
|   4:VOlapScanNode(345)                                     |
|      TABLE: tt.t1(t1), PREAGGREGATION: ON                  |
|      partitions=1/1 (t1)                                   |
|      tablets=1/1, tabletList=491188                        |
|      cardinality=21, avgRowSize=0.0, numNodes=1            |
|      pushAggOp=NONE                                        |
|                                                            |
| PLAN FRAGMENT 2                                            |
|                                                            |
|   PARTITION: HASH_PARTITIONED: c1[#0]                      |
|                                                            |
|   HAS_COLO_PLAN_NODE: false                                |
|                                                            |
|   STREAM DATA SINK                                         |
|     EXCHANGE ID: 02                                        |
|     HASH_PARTITIONED: c2[#2]                               |
|                                                            |
|   1:VAGGREGATE (update serialize)(361)                     |
|   |  STREAMING                                             |
|   |  output: partial_sum(c1[#0])[#3]                       |
|   |  group by: c2[#1]                                      |
|   |  sortByGroupKey:false                                  |
|   |  cardinality=5                                         |
|   |  distribute expr lists: c1[#0]                         |
|   |                                                        |
|   0:VOlapScanNode(358)                                     |
|      TABLE: tt.t2(t2), PREAGGREGATION: ON                  |
|      partitions=1/1 (t2)                                   |
|      tablets=1/1, tabletList=491198                        |
|      cardinality=10, avgRowSize=0.0, numNodes=1            |
|      pushAggOp=NONE                                        |
|                                                            |
|                                                            |
| Statistics                                                 |
|  planed with unknown column statistics                     |
+------------------------------------------------------------+
105 rows in set (0.06 sec)

JOIN决策之道

听完老李的讲解,小张豁然开朗。JOIN优化不是简单选择某种方案,而是根据实际情况灵活决策:

大表JOIN小表?

Broadcast Join一把梭

表大小相近?

Partition Shuffle稳妥当

左表分桶合适?

Bucket Shuffle显神威

同组表JOIN?

Colocate Join独领风骚

非等值JOIN?

Nest Loop Join来解围

经过实战,小张总结出一套JOIN优化锦囊:

数据分布先行

提前规划数据分布方式,为高性能JOIN打好基础

分而治之

善用分区裁剪,减少参与JOIN的数据量

巧用分桶

合理设计分桶策略,为Bucket Shuffle和Colocate Join创造条件

资源调优

合理配置并行度和内存,确保JOIN性能最大化

监控警醒

密切关注资源使用情况,及时发现性能瓶颈

小张按照这些经验优化了查询,原本需要几个小时的任务现在只要几分钟就能完成,老板连连称赞。

JOIN江湖博大精深,我们今天探讨的只是其中一隅。不同场景下选择合适的JOIN策略,才能在性能和资源消耗之间找到最佳平衡点。

下期,我们将一起探讨Doris其它更有趣有用有价值的内容,敬请期待!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-11-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一臻数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Doris JOIN 秘笈:性能优化从选对 JOIN 策略开始
    • JOIN的本质
    • Doris的JOIN武器库
  • 四大数据分布策略的江湖
    • Colocate Join 示例
  • JOIN决策之道
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档