有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为Shuffle。
下图是一个简单的Spark Job的运行图,根据宽依赖将任务划分为不同的Stage,
在划分stage时,最后一个stage称为 FinalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。
触发Shuffle的操作大概分为如下几类:
Shuffle的核心思想可以用上图来表示,前一个Stage的 ShuffleMapTask 进行 Shuffle Write, 把数据存储在 BlockManager上面,并且把数据位置元信息上报到Driver 的MapOutTrack组件中,下一个Stage根据数据位置元信息,进行Shuffle Read,拉取上个Stage的输出数据。
1. Map端task个数的确定
Shuffle过程中的task个数由RDD分区数决定,而RDD的分区个数与参数spark.default.parallelism
有关.
在Yarn Cluster模式下,如果没有手动设置,则:
spark.default.parallelism = max(所有executor使用的core总数,2)
。
参与决定分区数的参数defaultMinPartitions
也是由该参数确定的,
defaultMinPartitions=min(spark.default.parallelism, 2)
由于spark对于一个partition中的最大文件大小有限制(spark.files.maxPartitionBytes = 128 M (默认)
),为128M,因此自定义分区时,不能选的过小。
常见的几种情况如下:
2. reduce端的task个数的确定
Reduce端进行数据的聚合,一部分聚合算子可以手动指定并行度,如果没有指定,则以map端的最后一个RDD分区作为其分区数,分区数也就决定了reduce端的task个数。
1. 未优化的HashShuffleManager
相对于传统的 MapReduce,Spark 假定大多数情况下 Shuffle 的数据不需要排序,例如 Word Count,强制排序反而会降低性能。因此不在 Shuffle Read 时做 Merge Sort,如果需要合并的操作的话,则会使用聚合(agggregator),即用了一个 HashMap (实际上是一个 AppendOnlyMap)来将数据进行合并。
在 Map Task 过程按照 Hash 的方式重组 Partition 的数据,不进行排序。每个 Map Task 为每个 Reduce Task 生成一个文件,通常会产生大量的文件(即对应为 M*R 个中间文件,其中 M 表示 Map Task 个数,R 表示 Reduce Task 个数),伴随大量的随机磁盘 I/O 操作与大量的内存开销。
总结下这里的两个严重问题:
2. 优化后的HashShuffleManager
针对上面的第一个问题,Spark做了改进,引入了File Consolidation
机制。
一个Executor上所有的Map Task生成的分区文件只有一份,即将所有的Map Task相同的分区文件合并,这样每个 Executor上最多只生成N个分区文件。
这样就减少了文件数,但是假如下游 Stage 的分区数 N 很大,还是会在每个Executor上生成 N 个文件,同样,如果一个 Executor 上有 K 个 Core,还是会开 K*N 个 Writer Handler,所以这里仍然容易导致OOM。
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
1. 普通运行机制
下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
2. bypass运行机制
下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:
此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于: 第一,磁盘写机制不同; 第二,不会进行排序。 也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
扩展:Tungsten-Sort Based Shuffle / Unsafe Shuffle
从 Spark 1.5.0 开始,Spark 开始了钨丝计划(Tungsten),目的是优化内存和CPU的使用,进一步提升spark的性能。由于使用了堆外内存,而它基于 JDK Sun Unsafe API,故 Tungsten-Sort Based Shuffle 也被称为 Unsafe Shuffle。
它的做法是将数据记录用二进制的方式存储,直接在序列化的二进制数据上 Sort 而不是在 Java 对象上,这样一方面可以减少内存的使用和 GC 的开销,另一方面避免 Shuffle 过程中频繁的序列化以及反序列化。在排序过程中,它提供 cache-efficient sorter,使用一个 8 bytes 的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。
但是使用 Tungsten-Sort Based Shuffle 有几个限制,Shuffle 阶段不能有 aggregate 操作,分区数不能超过一定大小(2^24-1,这是可编码的最大 Parition Id),所以像 reduceByKey 这类有 aggregate 操作的算子是不能使用 Tungsten-Sort Based Shuffle,它会退化采用 Sort Shuffle。
从 Spark-1.6.0 开始,把 Sort Shuffle 和 Tungsten-Sort Based Shuffle 全部统一到 Sort Shuffle 中,如果检测到满足 Tungsten-Sort Based Shuffle 条件会自动采用 Tungsten-Sort Based Shuffle,否则采用 Sort Shuffle。从Spark-2.0.0开始,Spark 把 Hash Shuffle 移除,可以说目前 Spark-2.0 中只有一种 Shuffle,即为 Sort Shuffle。
1. 何时开始fetch上一个stage的数据
当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。
理论上讲一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。
2. 边 fetch 边处理还是一次性 fetch 完再处理?
边fetch边处理。
本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。
Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。
那么如何实现边 shuffle 边处理,而且流入的 records 是无序的? 使用可以 aggregate 的数据结构,比如 HashMap。 每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value),并将 func 的结果重新 put(key) 到 HashMap 中去。
3. fetch 来的数据存放到哪里?
刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。也可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。
内存使用的是AppendOnlyMap,类似Java的HashMap,内存+磁盘使用的是ExternalAppendOnlyMap,如果内存空间不足时,ExternalAppendOnlyMap可以将records进行sort后spill到磁盘上,等到需要它们的时候再进行归并。
4. 怎么获得要 fetch 的数据的存放位置?
一个 ShuffleMapStage形成后,会将该 stage 最后一个 final RDD 注册到 MapOutputTrackerMaster,reducer 在 shuffle 的时候去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置。
每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
参考: