“数据分析这一行,"慢查询"就像职场痛点一样挥之不去。 最近遇到不少分析师小伙伴,动不动抱怨查询跑了好几小时还没结果,只能对着转圈圈的进度条干着急。这不,上周我遇到一位老朋友,他就在为一个大表JOIN性能发愁。 "查询速度比蜗牛还慢,老板都催疯了..."他一脸苦相地说。 作为一名多年摸爬滚打在数据库优化一线的魔芋师,我忍不住笑了:"JOIN慢,那是你还不懂它的脾气秉性。就像武林高手过招,懂得借力使力,方能四两拨千斤。" 今天,基于Doris一起来扒一扒数据JOIN的那些事,看看如何让你的查询化身"闪电侠",让老板对你刮目相看!
数据分析师小张最近遇到了一个棘手的问题。他正在处理一个大规模数据分析任务,需要对几张超大表进行关联分析。起初他用了最常规的JOIN方式,结果查询速度慢得令人发指,一个查询要跑好几个小时。这可让他犯了难 - 老板要的报表迟迟出不来,催得他焦头烂额。
小张找到了他的老朋友、Doris专家老李诉苦。老李笑着说:"Join性能慢,选对Join策略是关键。Doris支持多种Join实现方式,我来给你'传授秘笈'。"
在分布式数据库中,JOIN操作看似简单,实则暗藏玄机。它不仅要完成表的关联,还要在分布式环境下协调数据的流转和计算。
例如,你有两张大表分布在不同的节点上,要完成JOIN,就必须解决一个核心问题 - 如何把要关联的数据凑到一起?这就涉及到了数据的重分布策略。
Doris在JOIN物理实现方式采用了两大主力 - Hash Join和Nest Loop Join。Hash Join就像武林高手的快剑,以迅雷不及掩耳之势完成关联;Nest Loop Join则如大侠的基本功,招式朴实无华但适用场景广泛。
Hash Join通过在内存中构建哈希表,能快速完成等值JOIN操作。好比打王者荣耀时快速找队友的场景 - 系统会给每个玩家分配一个唯一ID,需要时直接通过ID定位,效率自然高。
Nest Loop Join则采用最朴实的方式 - 遍历。它就像春节走亲戚,挨家挨户地拜访,虽然慢一些,但保证不会漏掉任何一家。这种方式适用于所有JOIN场景,包括非等值JOIN。
作为分布式 MPP 数据库,Apache Doris 在 Hash Join 过程中需要进行数据的 Shuffle,进行拆分调度,以确保 JOIN 结果的正确性。
老李拿出一张图,向小张展示了Doris中JOIN操作的四种数据分布策略:
Broadcast Join - 霸王级选手
Broadcast Join似一位霸道总裁,将右表数据强势复制到每个计算节点。简单粗暴,适用场景广泛。当右表数据量较小时,这种方式简单高效。网络开销随节点数量线性增长。
如图所示,Broadcast Join 的过程涉及将右表的所有数据发送到所有参与 Join 计算的节点,包括左表数据的扫描节点,而左表数据则保持不动。这一过程中,每个节点都会接收到右表的完整数据副本(总量为 T(R) 的数据),以确保所有节点都具备执行 Join 操作所需的数据。
该方法适用于多种通用场景,但不适用于 RIGHT OUTER, RIGHT ANTI, 和 RIGHT SEMI 类型的 Hash Join。其网络开销为 Join 的节点数 N 乘以右表数据量 T(R)。
Partition Shuffle - 中规中矩派
Partition Shuffle采用双向分发策略,将两张表按JOIN键哈希分布。网络开销等于两表数据量之和。这种方式像太极,讲究平衡,适合两表数据量相当的场景。
此方式通过 JOIN 条件计算 Hash 值并进行分桶。具体来说,左右表的数据会根据 JOIN 条件计算得到的 Hash 值进行分区,然后这些分区数据被发送到相应的分区节点上(如上图所示)。
该方法的网络开销主要包括两个部分:传输左表数据 T(S) 所需的开销和传输右表数据 T(R) 所需的开销。该方法的仅支持 Hash Join 操作,因为它依赖于 JOIN 条件来执行数据的分桶操作。
Bucket Shuffle - 精打细算派
Bucket Shuffle利用表的分桶特性,只需重分布右表数据。网络开销仅为右表数据量。这就像武林高手借力打力,善用对方优势。当左表已按JOIN键分桶时特别高效。
即JOIN 条件包含左表的分桶列时,保持左表数据不动,将右表数据分发到左表节点进行 JOIN,减少网络开销。
当参与 Join 操作的某一侧表的数据已经按照 Join 条件列进行了 Hash 分布时,我们可以选择保持这一侧的数据位置不变,而将另一侧的数据依据相同的 Join 条件列,相同的 Hash 分布计算进行数据分发。(这里提到的“表”不仅限于物理存储的表,还可以是 SQL 查询中任意算子的输出结果,并且可以灵活选择保持左表或右表的数据位置不变,而只移动并分发另一侧的表。)
以 Doris 的物理表为例,由于其表数据本身就是通过 Hash 计算进行分桶存储,因此可以直接利用这一特性来优化 Join 操作的数据 Shuffle 过程。假设我们有两张表需要进行 Join,且 Join 列是左表的分桶列,那么在这种情况下,我们无需移动左表的数据,只需根据左表的分桶信息将右表的数据分发到相应的位置,即可完成 Join 计算(如上图所示)。
此过程的网络开销主要来自于右表数据的移动,即 T(R)。
Colocate Join - 绝顶高手
Colocate Join是终极优化,数据提前按同样方式分布,JOIN时无需移动数据。这就像心有灵犀的搭档,配合天衣无缝。零网络开销,性能最优,不过要求最严格。
与 Bucket Shuffle Join 相似,如果参与 Join 的两侧的表,刚好是按照 Join 条件列进行计算的 Hash 分布,那么可以跳过 Shuffle 过程,直接在本地进行 Join 计算。以下通过物理表进行简单说明:
当 Doris 在建表时指定为 DISTRIBUTED BY HASH,那么在数据导入时,系统会根据 Hash 分布键进行数据分发。如果两张表的 Hash 分布键恰好与 Join 条件列一致,那么可以认为这两张表的数据已经按照 Join 的需求进行了预分布,即无需额外的 Shuffle 操作。因此,在实际查询时,可以直接在这两张表上执行 Join 计算。
介绍Doris的四种Hash Join后,挑个 Colocate Join 绝顶高手来比试一番:
在下面的例子中,t1 和 t2 表都通过 GROUP BY 算子进行了处理,并输出了新的表(此时 tx 和 ty 均按照 c2 进行了 Hash 分布)。随后的 JOIN 条件是 tx.c2 = ty.c2,这恰好满足了 Colocate Join 的条件。
explain select *
from
(
-- t1 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
select c2 as c2, sum(c1) as c1
from t1
group by c2
) tx
join
(
-- t2 表按照 c1 做了 hash 分布,经过 group by 算子后,数据分布变成按照 c2 进行的 hash 分布
select c2 as c2, sum(c1) as c1
from t2
group by c2
) ty
on tx.c2 = ty.c2;
从下面的 Explain 执行计划结果中可以看出,8 号 Hash Join 节点的左侧子节点是 7 号聚合节点,右侧子节点是 3 号聚合节点,并且没有出现 Exchange 节点。这表明左侧和右侧子节点聚合后的数据都保持在其原始位置不动,无需进行数据移动,可以直接在本地进行后续的 Hash Join 操作。
+------------------------------------------------------------+
| Explain String(Nereids Planner) |
+------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS: |
| c2[#20] |
| c1[#21] |
| c2[#22] |
| c1[#23] |
| PARTITION: HASH_PARTITIONED: c2[#10] |
| |
| HAS_COLO_PLAN_NODE: true |
| |
| VRESULT SINK |
| MYSQL_PROTOCAL |
| |
| 8:VHASH JOIN(373) |
| | join op: INNER JOIN(PARTITIONED)[] |
| | equal join conjunct: (c2[#14] = c2[#6]) |
| | cardinality=10 |
| | vec output tuple id: 9 |
| | output tuple id: 9 |
| | vIntermediate tuple ids: 8 |
| | hash output slot ids: 6 7 14 15 |
| | final projections: c2[#16], c1[#17], c2[#18], c1[#19] |
| | final project output tuple id: 9 |
| | distribute expr lists: c2[#14] |
| | distribute expr lists: c2[#6] |
| | |
| |----3:VAGGREGATE (merge finalize)(367) |
| | | output: sum(partial_sum(c1)[#3])[#5] |
| | | group by: c2[#2] |
| | | sortByGroupKey:false |
| | | cardinality=5 |
| | | final projections: c2[#4], c1[#5] |
| | | final project output tuple id: 3 |
| | | distribute expr lists: c2[#2] |
| | | |
| | 2:VEXCHANGE |
| | offset: 0 |
| | distribute expr lists: |
| | |
| 7:VAGGREGATE (merge finalize)(354) |
| | output: sum(partial_sum(c1)[#11])[#13] |
| | group by: c2[#10] |
| | sortByGroupKey:false |
| | cardinality=10 |
| | final projections: c2[#12], c1[#13] |
| | final project output tuple id: 7 |
| | distribute expr lists: c2[#10] |
| | |
| 6:VEXCHANGE |
| offset: 0 |
| distribute expr lists: |
| |
| PLAN FRAGMENT 1 |
| |
| PARTITION: HASH_PARTITIONED: c1[#8] |
| |
| HAS_COLO_PLAN_NODE: false |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 06 |
| HASH_PARTITIONED: c2[#10] |
| |
| 5:VAGGREGATE (update serialize)(348) |
| | STREAMING |
| | output: partial_sum(c1[#8])[#11] |
| | group by: c2[#9] |
| | sortByGroupKey:false |
| | cardinality=10 |
| | distribute expr lists: c1[#8] |
| | |
| 4:VOlapScanNode(345) |
| TABLE: tt.t1(t1), PREAGGREGATION: ON |
| partitions=1/1 (t1) |
| tablets=1/1, tabletList=491188 |
| cardinality=21, avgRowSize=0.0, numNodes=1 |
| pushAggOp=NONE |
| |
| PLAN FRAGMENT 2 |
| |
| PARTITION: HASH_PARTITIONED: c1[#0] |
| |
| HAS_COLO_PLAN_NODE: false |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 02 |
| HASH_PARTITIONED: c2[#2] |
| |
| 1:VAGGREGATE (update serialize)(361) |
| | STREAMING |
| | output: partial_sum(c1[#0])[#3] |
| | group by: c2[#1] |
| | sortByGroupKey:false |
| | cardinality=5 |
| | distribute expr lists: c1[#0] |
| | |
| 0:VOlapScanNode(358) |
| TABLE: tt.t2(t2), PREAGGREGATION: ON |
| partitions=1/1 (t2) |
| tablets=1/1, tabletList=491198 |
| cardinality=10, avgRowSize=0.0, numNodes=1 |
| pushAggOp=NONE |
| |
| |
| Statistics |
| planed with unknown column statistics |
+------------------------------------------------------------+
105 rows in set (0.06 sec)
听完老李的讲解,小张豁然开朗。JOIN优化不是简单选择某种方案,而是根据实际情况灵活决策:
大表JOIN小表?
Broadcast Join一把梭
表大小相近?
Partition Shuffle稳妥当
左表分桶合适?
Bucket Shuffle显神威
同组表JOIN?
Colocate Join独领风骚
非等值JOIN?
Nest Loop Join来解围
经过实战,小张总结出一套JOIN优化锦囊:
数据分布先行
提前规划数据分布方式,为高性能JOIN打好基础
分而治之
善用分区裁剪,减少参与JOIN的数据量
巧用分桶
合理设计分桶策略,为Bucket Shuffle和Colocate Join创造条件
资源调优
合理配置并行度和内存,确保JOIN性能最大化
监控警醒
密切关注资源使用情况,及时发现性能瓶颈
小张按照这些经验优化了查询,原本需要几个小时的任务现在只要几分钟就能完成,老板连连称赞。
JOIN江湖博大精深,我们今天探讨的只是其中一隅。不同场景下选择合适的JOIN策略,才能在性能和资源消耗之间找到最佳平衡点。
下期,我们将一起探讨Doris其它更有趣有用有价值的内容,敬请期待!