本文主要是一篇总结性文章, 将列举绝大部分的 Spark Transformation算子及其使用方法 和一些使用场景。
该类算子属于 Spark 转换类算子, 不会立即执行, 其需要 Action 算子 来触发, 才能正在执行。 其一定都是有一个返回值,并且类型是 RDD
func
转换,返回一个新的 RDD。
补充:英文中的 dataset 意为 数据集,这里就是我们的 RDDfunc
进行过滤,返回一个新的 RDD。
补充:func
函数返回 false
则过滤,返回true
则保留func
将接受一个迭代器,可以从迭代器中获取每一条数据进行操作,返回一个迭代器。形成一个新的 RDD。
补充: sc.parallelize(Seq(1, 2, 3, 4, 5),1) .mapPartitions(iter => { var res = List[Int]() //创建 mysql 客户端 println("连接数据库") while (iter.hasNext) { val next = iter.next() println("向数据库写入数据:" + next) res = res :+ next } res.toIterator }) .foreach(println) 输出如下,我们可以发现,通过一次连接我们就将一个 partition的数据都写入了数据库, 如果使用的是 Map 算子,那么每写入一条数据都需要一次数据库连接,很明显是不划算的: 连接数据库 向数据库写入数据:1 向数据库写入数据:2 向数据库写入数据:3 向数据库写入数据:4 向数据库写入数据:5 1 2 3 4 5
sc.parallelize(Seq(1, 2, 3, 4, 5),1) .mapPartitions(iter => { var res = List[Int]() println("连接数据库") iter.map(next=>{ println("向数据库写入数据:" + next) next }) }) .foreach(println) 输出如下,其中的差异你可以细细体会, 不但代码更简单, 而且可以防止partition数据过大导致的 OOM 等问题: 连接数据库 向数据库写入数据:1 1 向数据库写入数据:2 2 向数据库写入数据:3 3 向数据库写入数据:4 4 向数据库写入数据:5 5
reduceByKey
or aggregateByKey
will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions
argument to set a different number of tasks.
KV 格式的 RDD才能使用,对 Key 作分组后形成一个 新的 RDD,
这里不建议使用该算子,尽量用 reduceByKey
或者 aggregateByKey
来代替,
这里主要是考虑到数据量的问题,
reduceByKey
或者 aggregateByKey
是会在shuffle的聚合的时候进行一个预聚合,
可以减少数据量,加快运行速度。
不过实际生产中还是会使用的到,这样主要是要注意一下这个问题。groupByKey
, the number of reduce tasks is configurable through an optional second argument.
KV 格式的 RDD才能使用,根据 key 进行分组聚合,形成一个 新的RDD。
某个 key组内 累加 聚合 逻辑大致如下:
代码如下: sc.parallelize(Seq("a" -> 1, "a" -> 2, "a" -> 3, "a" -> 4, "a" -> 5)) .reduceByKey(_+_) .foreach(print)
groupByKey
, the number of reduce tasks is configurable through an optional second argument.
KV 格式的 RDD才能使用,类似于 reduceBykey,但是比 reduceBykey 具有更复杂的操作,其不同主要在于以下几点:
reduceByKey
,
该函数一般还是用来做优化吧,
比如求 topN,我们只需要在每个 partition 上求 topN 在聚合,
而不需要全局聚合后再去求 topN。ascending
argument.
KV 格式的 RDD才能使用, 根据 Key 进行排序,形成一个新的 RDD
ascending:是否是升序leftOuterJoin
, rightOuterJoin
, and fullOuterJoin
.
KV 格式的 RDD才能使用,
类似于我们 sql 里面的 内连接,将两个 RDD 的 key 值进行关联,
返回在两个 RDD 中都存在的 Key 的数据,形成一个 新的RDD。
补充:关于 leftOuterJoin
, rightOuterJoin
, and fullOuterJoin
,和 sql 都差不多,这里就不赘述了。
如果你还不太了解,可以查看这里groupWith
.
KV 格式的 RDD才能使用, 和 groupWith
是一样的
将两个 RDD聚合,并按照 Key 分组,形成一个新的RDD。
val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3)) val r2 = sc.parallelize(Seq("b" -> 1, "c" -> 2, "d" -> 3)) r1.groupWith(r2).foreach(println)
打印如下,其中 CompactBuffer 可以理解为一个优化后的数组:
(d,(CompactBuffer(),CompactBuffer(3))) (a,(CompactBuffer(1),CompactBuffer())) (b,(CompactBuffer(2),CompactBuffer(1))) (c,(CompactBuffer(3),CompactBuffer(2)))
cut
指令,:
hello spark hello python hello scala
实际上你也可以传入任何你shell 脚本的路径,
额外提一句的话,记得在 Linux 上面执行,否则可能会出错,
这里的话就一个参数 envVars
,可以配置一些你的执行环境的参数。
当然实际上除非你 shell 非常 6,否则还是很少用的到的。
filter
算子过滤大部分数据 等方面
补充:警惕数据倾斜,我们一般减少分区的时候是不希望多进行一次 shuffle 的,
所以才使用的该算子,但是不进行 shuffle 的话,
会使得多个分区直接看成一个分区,
如果这多个分区刚好是数据比较大的时候,会导致比较严重的数据倾斜。coalesce
的另外一种表现。
一般我们还是喜欢使用该算子进行分区repartition
and then sorting within each partition because it can push the sorting down into the shuffle machinery.
重新分区,并排序。
相比于repartition
,再进行 排序 ,会有比较好的性能。
因为可以将排序下推到 shuffle 阶段以上这些就是在官网找到的所有 transformation 算子了,还有一些官网中没有提到的...或者说我没在官网中找到的...那么下面我们再加一点点。
输出如下:
((a,1),0)
((b,2),1)
((c,3),2)
((a,4),3)
((b,5),4)
((c,6),5)
补充:需要注意的是,这个index 会因为你使用一些shuffle算子而改变, 所以要注意使用,一般也是用在机器学习算法里面
zipWithIndex
类似,不过其 id 会带上分区的信息,
每个分区的第一条数据 id 即是该分区的分区号,第二条数据的 id = 第一条数据的id + 总分区数
val r1 = sc.parallelize(Seq("a" -> 1, "b" -> 2, "c" -> 3, "a" -> 4, "b" -> 5, "c" -> 6), 3) //d打印分区信息 r1.mapPartitionsWithIndex((x, y) => { println("分区:"+x) val res = y.map(z => { println(z) z }) res }, false) .count() println("===============") r1.zipWithUniqueId().foreach(println)
输出如下,可以观察下其 id 的规则是否如上面所说:
分区:0 (a,1) (b,2) 分区:1 (c,3) (a,4) 分区:2 (b,5) (c,6) =============== ((a,1),0) ((b,2),1) ((c,3),2) ((a,4),3) ((b,5),4) ((c,6),5)
zipPartitions
还有一些重载,
大致使用方法都是一样的,这里就不赘述了.