RDD 分区器
Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认
分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分
区,进而决定了 Reduce 的个数。
只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
1)Hash 分区:对于给定的 key,计算其hashCode,并除以分区个数取余
2)Range 分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而
且分区间有序
3)自定义分区:
rdd1.partitionBy(new MyPartiton)
/**
* 自定义分区器
* 1.继承Partitioner
* 2.重写方法
*/
class MyPartiton extends Partitioner {
//重写分区数量
override def numPartitions: Int = 3
//返回数据的分区索引,从零开始,根据数据的key值返回
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "wnba" => 1
case _ => 2
}
}
}
二 RDD 文件读取与保存
Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。
text 文件
// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")
sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(FlatFile)。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)。
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
object 对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFile[T:
ClassTag](path)函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用
saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
领取专属 10元无门槛券
私享最新 技术干货