DAG
,在离散数学中我们学到了一个 有向无环图(Directed Acyclic Graph) 的概念,再生产环境中,我写的任务仅仅是 有向树(Directed tree) 级别,有向无环图还未遇到过。
但是可以想象到,如果在代码中使用了 RDD 的 join
算子是有可能出现 有向无环图 的 DAG。对于我们组所使用的日志数据处理,主要还是集中在 有向树复杂度的 逻辑拓扑。PS: 有向树一定是 有向无环图,有向无环图不一定都是有向树。可以自行脑补一下 将流程抽象为拓扑能够更好的将在其中添加各种优化措施,而不是像 Hadoop MapReduce 一般将每一步的结果都写回,造成大量的浪费。
real_data.map(deal_data_func).reduceByKey(merge_data_func).foreachRDD(store_data_func)
store_data_func
中 使用 foreachPartition
进行与存储化介质之间的联通。在 Spark 中,该方法称作 action
transformation
和 action
,当且仅当action
被调用时,Spark
才会真正将任务提交至 DAG调度器
,进而分配至 Task调度器
如果在编写 Spark 项目时,仅仅做了 transformation 但并未提交 action,这时候 Spark Would do nothing!
real_data.map(deal_data_func).reduceByKey(merge_data_func)
这种写法放在寻常非Spark项目中一点也不意外,甚至可以认为是完整的。 这是与 MapReduce 最大的区别之一,因为 MapReduce 没有所谓的 Stage 划分,导致很多人看了网上的老代码,在新入手 Spark 时陷入这个误区。
action
之后才真正执行计算,是为了充分利用 DAG
划分 Stage
带来的优势,包括但不限于 减少计算量,I/O负载 等
transformation
操作中,上篇提到,其又分为两类: 宽依赖(reduceByKey, ...),窄依赖(map,flatMap, ...)Partition
中的每个数据做一次映射,Partition
数目不变Partition
及其数据。Partition
数是初学者使用时最大的疑惑以及黑盒(包括我),在某天我终于忍不住,去查了源码,以 reduceByKey
为例子:
# reduceByKey 有三种函数签名,一目了然 1.def reduceByKey( partitioner: Partitioner, func: JFunction2[V, V, V] ) 2.def reduceByKey( func: JFunction2[V, V, V], numPartitions: Int ) 3.def reduceByKey( func: JFunction2[V, V, V] )3
2
较 3
多了一个参数 numPartitions
,这个参数代表我们可以人为指定 Partition
数目,所以很多网上说的 Partition
为 Spark
自己生成的 带有一定的误导性,但这个函数仅当十分了解 Spark
调度原理时才使用。1
是本次的重点,其第一个参数是 Partitioner
类型的变量,我们可以猜测,如果我们使用 3
时,既不指定 numPartitions
也不指定一个 Partitioner
那必然有一个 default
的东西,用于确定 reduceByKey
后的 Partition
数量3
的函数实现中我们看见了 defaultPartitioner
的实例化,并调用了 1
:
fromRDD(reduceByKey(defaultPartitioner(rdd), func)) # 签名,可以看出,该实例是至少要传入一个 rdd 作为参数的 def defaultPartitioner( rdd: RDD[_], others: RDD[_]* )defaultPartitioner
是 Spark 一个自带的实现,其内实现了一段设置 新RDDPartition
逻辑spark.default.parallelism
参数 则 新RDD 的 Partition
数为 旧RDD 中最大的spark.default.parallelism
参数,则 新RDD 的 Partition
数目为该参数的值。
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) { rdd.context.defaultParallelism } else { rdds.map(_.partitions.length).max }spark.default.parallelism 参数在 Spark 项目初始化之时设置,保存在 SparkContext 中,用过 Spark 的人不会陌生,一般而言这个值设置成
Excutor * Excutor-core * 2
至此弄懂了Partition
数量的计算由来,相关看更详细的源码操作,可以阅读 Spark Core 中的 Partitioner.scala 文件,很简洁。
Partition
数目之所以重要,缘由便在于其在很大程度上决定了 Spark 的 并发 效果,上篇文章提到, RDD 的 Partition
·与其所处 Stage
中的 task
一一对应,这也是这个spark.default.parallelism
参数名字的由来。在 Spark 的 Patch 中对于 Partition 数目的选择一直是一个热议,大家有兴趣可以看看例如这个 Patch(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-6377) ,但截至目前 Spark-2.3.2,依旧是我上述的结论 但是实际上 Spark SQL 已经有了一个动态调整 Partition 数量的功能代码, 1spark.sql.adaptive.enable=true1
Stage
的划分应该是最好理解的,或者说并不需要深究源码级别的理解,实际使用中,我们最需要留意的地方,便是在何时会发生 Shuffle
,而Stage
的划分就是为了找出 Shuffle
最该发生的位置Shuffle
的发生意味着,数据可能会在不同节点间的迁移,内存向文件的写出操作,内存读取文件内容的一系列损耗较大的操作,90%以上的场景是需要越少 Shuffle
越好。transformation
的替换来达到这个目的,最经典的 用 reduceByKey
替换 groupByKey
就不再赘述,原理就是 前者会将本机数据先做一次聚合,再传输到其他节点上,减少Shuffle
Stage
在 Spark 本质中就是一系列可以 并行 执行的 task
的集合,划分 Stage
的标准便是 宽依赖 的出现。以文章开头处的例子为原型reduceByKey
时,Shuffle
便开始了,如果你的 Spark 是一套用有 多 个节点的集群reduceByKey
,得到一份本地的唯一 <key,value>
Shuffle-Write(如Write Disk)
,由DAGScheduler
选择分配哪些数据到哪个节点(defaultpartitioner
决定)Shuffle-Read(如Read Network)
主动拉取数据key
都是全局唯一的Shuffle
的消耗,除了减少 Shuffle
的产生次数。还要尽量减少每次 Shuffle
的数据量大小。在
Shuffle
过后,我们的项目场景一般就需要存储计算结果,而计算结果的存放又在一定程度上决定了这批次任务是否能真正完成,大致可分为 就地存储 和 集中存储,将在下篇详述。
这部分细节,实际上对实际项目中的应用没什么太大帮助,纯粹是了解一下 Spark 的内在,只需要知道的是, Shuffle 带来的各种 IO 无法避免,Spark 正在不断新增各种优化算法,来降低这部分的开销。 此处需要设定场景,我们用的是默认存储介质,
Shuffle Write
是向本地磁盘写入数据。
transformation
时,依旧以上面的例子为背景。Shuffle Write
,Spark 会先确定本次的 分区器(Partitioner
),由上面内容可知,分区器的作用有二:reduceByKey
Partitioner
决定哪些 key
的数据属于哪个分区,且在内存中按分区序号排序,当内存不足时,写出到磁盘,并带上索引文件,以标识不同分区数据(此文件是按序排列)。1
处的 Task
与 旧RDD的 Partition
一一对应,在3
阶段做一次合并。4
阶段的 Task
代表远端 Shuffle Read
的Task
,其数量与 新RDD 的 Partition
相同且一一对应。此处有太多细节没有详述,因为 Shuffle Write 的算法有不少, Spark 根据情况来选择用哪种算法输出文件减少性能损耗。 上边所说的情况亦是其中的一种
SortShuffle
而已
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。