Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark join种类(>3种)及join选择依据

Spark join种类(>3种)及join选择依据

作者头像
Spark学习技巧
发布于 2021-03-05 06:59:43
发布于 2021-03-05 06:59:43
1.1K00
代码可运行
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧
运行总次数:0
代码可运行

hashjoin

join是作为业务开发绕不开的SQL话题,无论是传统的数据库join,还是大数据里的join。

做过Spark/flink流处理的应该都用过一种流表和维表的join,维表对于Spark来说可以是driver端获取后广播到每个Executor,然后在executor端执行流表task的时候join,其实大多数是个hashmap,而很多时候这个维表比较大会存储于redis/hbase。Flink进行维表join可以用的方式比较多了,比如直接open方法里从外部加载的静态hashmap,这种就无法更新,因为Flink不像Spark可以每个批次或者若干批次加载一次维表。也可以使用LRU+异步IO+外部存储来实现join,这样就实现了对外部更新的感知。甚至也可以使用Flink的广播功能实现join操作。

上面所说的就是比较常见的hashjoin的简单表达,将维表通过join的条件key构建为一个hashtable,就拿java 8的HashMap来说吧,就是一个数组+链表(链表过长会变为红黑树),数组下标就是key,数组存储的是value的指针。

join的时候主表通过join条件构建key去,hashmap里查找。

Spark BroadCastHashJoin

翻过源码之后你就会发现,Spark 1.6之前实现BroadCastHashJoin就是利用的Java的HashMap来实现的。大家感兴趣可以去Spark 1.6的源码里搜索BroadCastHashJoin,HashedRelation,探查一下源码。

具体实现就是driver端根据表的统计信息,当发现一张小表达到广播条件的时候,就会将小表collect到driver端,然后构建一个HashedRelation,然后广播。

其实,就跟我们在使用Spark Streaming的时候广播hashmap一样。

重点强调里面最大行数限制和最大bytes限制并不是我们设置的自动广播参数限制,而是内部存储结构的限制。

还有在Spark后期版本主要就是使用了TaskMemoryManager而不是HashMap进行了背书。

ShuffledHashJoin

BroadCastHashJoin适合的是大表和小表的join策略,将整个小表广播。很多时候,参与join的表本身都不适合广播,也不适合放入内存,但是按照一定分区拆开后就可以放入内存构建为HashRelation。这个就是分治思想了,将两张表按照相同的hash分区器及分区数进行,对join条件进行分区,那么需要join的key就会落入相同的分区里,然后就可以利用本地join的策略来进行join了。

也即是ShuffledHashJoin有两个重要步骤:

  1. join的两张表有一张是相对小表,经过拆分后可以实现本地join。
  2. 相同的分区器及分区数,按照joinkey进行分区,这样约束后joinkey范围就限制在相同的分区中,不依赖其他分区完成join。
  3. 对小表分区构建一个HashRelation。然后就可以完成本地hashedjoin了,参考ShuffleHashJoinExec代码,这个如下图:

SortMergeJoin

上面两张情况都是小表本身适合放入内存或者中表经过分区治理后适合放入内存,来完成本地化hashedjoin,小表数据放在内存中,很奢侈的,所以经常会遇到join,就oom。小表,中表都是依据内存说的,你内存无限,那是最好。

那么,大表和大表join怎么办?这时候就可以利用SortMergeJoin来完成。

SortMergeJoin基本过程如下:

  1. 首先采取相同的分区器及分区数对两张表进行重分区操作,保证两张表相同的key落到相同的分区。
  2. 对于单个分区节点两个表的数据,分别进行按照key排序。
  3. 对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边。‘

Spark 3.1以后的spark版本对sortmergejoin又进一步优化了。

Spark SQL的join方式选择

假如用户使用Spark SQL的适合用了hints,那Spark会先采用Hints提示的join方式。

broadcastHashJoin,hints写法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 支持 BROADCAST, BROADCASTJOIN and MAPJOIN 来表达 broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

ShuffledHashJoin,hints的sql写法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 仅支持 SHUFFLE_HASH 来表达 ShuffledHashJoin hint
SELECT /*+ SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key

SortMergeJoin,hints的SQL写法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- 支持 SHUFFLE_MERGE, MERGE and MERGEJOIN 来表达 SortMergeJoin hintSELECT /*+ MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key

假设用户没有使用hints,默认顺序是:

1.先判断,假设join的表统计信息现实,一张表大小大于0,且小于等于用户配置的自动广播阈值则,采用广播。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
参数:spark.sql.autoBroadcastJoinThreshold

假设两张表都满足广播需求,选最小的。

2.不满足广播就判断是否满足ShuffledHashJoin,首先下面参数要设置为false,默认为true。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.join.preferSortMergeJoin=true

还有两个条件,根据统计信息,表的bytes是广播的阈值*总并行度:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions

并且该表bytes乘以3要小于等于另一张表的bytes:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes

那么这张表就适合分治之后,作为每个分区构建本地hashtable的表。

3.不满足广播,也不满足ShuffledHashJoin,就判断是否满足SortMergeJoin。条件很简单,那就是key要支持可排序。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def createSortMergeJoin() = {
if (RowOrdering.isOrderable(leftKeys)) {
    Some(Seq(joins.SortMergeJoinExec(
      leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right))))
  } else {
    None
  }
}

这段代码是在SparkStrageties类,JoinSelection单例类内部。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
  .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
  .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint)))
  .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
  .getOrElse(createJoinWithoutHint())

当然,这三种join都是等值join,之前的版本Spark仅仅支持等值join但是不支持非等值join,常见的业务开发中确实存在非等值join的情况,spark目前支持非等值join的实现有以下两种,由于实现问题,确实很容易oom。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Broadcast nested loop joinShuffle-and-replicate nested loop join。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark SQL是如何选择join策略的?
我们都知道,Spark SQL上主要有三种实现join的策略,分别是Broadcast hash join、Shuffle hash join、Sort merge join。那Catalyst是依据什么样的规则来选择join策略的?本文来简单补个漏。
王知无-import_bigdata
2020/05/07
2.9K0
Spark SQL如何选择join策略
众所周知,Catalyst Optimizer是Spark SQL的核心,它主要负责将SQL语句转换成最终的物理执行计划,在一定程度上决定了SQL执行的性能。
大数据学习与分享
2020/08/10
1.3K0
SparkSQL 如何选择 join 策略
Join 操作是大数据分析领域必不可少的操作,本文将从原理层面介绍 SparkSQL 支持的五大连接策略及其应用场景。
kk大数据
2024/02/28
5010
SparkSQL 如何选择 join 策略
Spark Join 源码剖析①
在 Spark SQL 中,参与 Join 操作的两张表分别被称为流式表(StreamTable)和构件表(BuildTable),不同表的角色在 Spark SQL 中会通过一定的策略进行设定。通常来讲,系统会将大表设置为 StreamTable,小表设置为 BuildTable。流式表的迭代器为 streamIter,构建表的迭代器为 buildIter。遍历 streamIter 的每一条记录,然后在 buildIter 中查找匹配的记录。这个查找过程称为 build 过程。每次 build 操作的结果为一条 JoinedRow(A, B),其中 A 来自 streamedIter,B 来自 buildIter。
codingforfun
2022/05/23
8780
Spark Join 源码剖析①
【硬刚大数据】从零到大数据专家面试篇之SparkSQL篇
Spark SQL是一个用来处理结构化数据的Spark组件,前身是shark,但是shark过多的依赖于hive如采用hive的语法解析器、查询优化器等,制约了Spark各个组件之间的相互集成,因此Spark SQL应运而生。
大数据真好玩
2021/09/18
2.5K0
【硬刚大数据】从零到大数据专家面试篇之SparkSQL篇
SparkSQL的3种Join实现
Join是SQL语句中的常用操作,良好的表结构能够将数据分散在不同的表中,使其符合某种范式,减少表冗余、更新容错等。而建立表和表之间关系的最佳方式就是Join操作。
王知无-import_bigdata
2019/07/09
3.9K0
SparkSQL的3种Join实现
Spark难点 | Join的实现原理
当前SparkSQL支持三种join算法:Shuffle Hash Join、Broadcast Hash Join以及Sort Merge Join。其中前两者归根到底都属于Hash Join,只不过载Hash Join之前需要先Shuffle还是先Broadcast。其实,Hash Join算法来自于传统数据库,而Shuffle和Broadcast是大数据在分布式情况下的概念,两者结合的产物。因此可以说,大数据的根就是传统数据库。Hash Join是内核。
王知无-import_bigdata
2019/11/18
1.7K0
Spark Adaptive Execution调研
本文阅读价值不错建议大家仔细阅读,感谢作者疯狂哈秋,转自:https://blog.csdn.net/u013332124/article/details/90677676
Spark学习技巧
2019/07/15
1.9K0
Spark Adaptive Execution调研
Spark SQL在100TB上的自适应执行实践
Spark SQL是Apache Spark最广泛使用的一个组件,它提供了非常友好的接口来分布式处理结构化数据,在很多应用领域都有成功的生产实践,但是在超大规模集群和数据集上,Spark SQL仍然遇到不少易用性和可扩展性的挑战。为了应对这些挑战,英特尔大数据技术团队和百度大数据基础架构部工程师在Spark 社区版本的基础上,改进并实现了自适应执行引擎。本文首先讨论Spark SQL在大规模数据集上遇到的挑战,然后介绍自适应执行的背景和基本架构,以及自适应执行如何应对Spark SQL这些问题,最后我们将比
CSDN技术头条
2018/02/06
2.7K0
Spark SQL在100TB上的自适应执行实践
SparkSQL Join深度解析:三种实现方式全揭秘
SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 SparkSQL 的 Join 也就相当于掌握了这位大拿。
shengjk1
2025/05/16
1180
SparkSQL Join深度解析:三种实现方式全揭秘
Spark SQL 之 Join 实现
本文介绍了Spark SQL的Join实现原理、不同Join方式的实现流程、优化策略以及社区现状,为Spark SQL的Join实现提供了全面且深入的解析,有助于开发者深入了解Spark SQL的Join实现细节,从而更好地利用Spark SQL进行数据处理和分析。
涂小刚
2017/07/19
9.6K5
Spark SQL 之 Join 实现
尝尝鲜|Spark 3.1自适应执行计划
每个框架产生都是为了解决一类问题,每个模块的优化也是为了解决一定的场景下的性能瓶颈。浪尖今天分享的关于Spark 3.1之后的自适应执行计划,主要针对以下几个场景,并且有百度率先研发的,不过社区之前一直没有采纳,spark 3.0的预发布版本参数也是不全,到了Spark 3.1的beta版已经可用,浪尖已经完成了测试。
Spark学习技巧
2021/03/05
9160
尝尝鲜|Spark 3.1自适应执行计划
Adaptive Execution 让 Spark SQL 更高效更智能
前面《Spark SQL / Catalyst 内部原理 与 RBO》与《Spark SQL 性能优化再进一步 CBO 基于代价的优化》介绍的优化,从查询本身与目标数据的特点的角度尽可能保证了最终生成的执行计划的高效性。但是
Jason Guo
2018/11/20
1.1K0
SparkSQL中产生笛卡尔积的几种典型场景以及处理策略
【前言:如果你经常使用Spark SQL进行数据的处理分析,那么对笛卡尔积的危害性一定不陌生,比如大量占用集群资源导致其他任务无法正常执行,甚至导致节点宕机。那么都有哪些情况会产生笛卡尔积,以及如何事前"预测"写的SQL会产生笛卡尔积从而避免呢?(以下不考虑业务需求确实需要笛卡尔积的场景)】
大数据学习与分享
2020/08/10
2.4K0
SparkSQL中产生笛卡尔积的几种典型场景以及处理策略
Spark 3.0 新特性 之 自适应查询与分区动态裁剪
Spark憋了一年半的大招后,发布了3.0版本,新特性主要与Spark SQL和Python相关。这也恰恰说明了大数据方向的两大核心:BI与AI。下面是本次发布的主要特性,包括性能、API、生态升级、数据源、SQL兼容、监控和调试等方面的升级。
用户1154259
2020/07/27
1.7K0
SparkSQL的自适应执行-Adaptive Execution
Adaptive Execution 将可以根据执行过程中的中间数据优化后续执行,从而提高整体执行效率。核心在于两点
王知无-import_bigdata
2020/07/03
1.7K0
spark 多表 join
1.Broadcast Hash Join(小表广播,小表Join大表)(分布式改造)
用户6404053
2019/11/03
3.3K0
spark 多表 join
【Spark重点难点08】Spark3.0中的AQE和DPP小总结
包括动态分区剪裁(Dynamic Partition Pruning)、自适应查询执行(Adaptive Query Execution)、加速器感知调度(Accelerator-aware Scheduling)、支持 Catalog 的数据源API(Data Source API with Catalog Supports)、SparkR 中的向量化(Vectorization in SparkR)、支持 Hadoop 3/JDK 11/Scala 2.12 等等。
王知无-import_bigdata
2021/12/22
3.1K0
【Spark重点难点08】Spark3.0中的AQE和DPP小总结
Apache Spark 3.0 自适应查询优化在网易的深度实践及改进
本文基于 Apahce Spark 3.1.1 版本,讲述 AQE 自适应查询优化的原理,以及网易有数在 AQE 实践中遇到的痛点和做出的思考。
大数据真好玩
2021/07/07
1.1K0
第四范式OpenMLDB: 拓展Spark源码实现高性能Join
Spark是目前最流行的分布式大数据批处理框架,使用Spark可以轻易地实现上百G甚至T级别数据的SQL运算,例如单行特征计算或者多表的Join拼接。
代码医生工作室
2021/09/29
1.2K0
第四范式OpenMLDB: 拓展Spark源码实现高性能Join
推荐阅读
相关推荐
Spark SQL是如何选择join策略的?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验