1 RDD 创建
在 Spark 中创建 RDD 的创建方式可以分为四种:
1) 从集合(内存)中创建 RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
def main(args: Array[String]): Unit = {
//准备环境
val rddConf = new SparkConf().setMaster("loval[*]").setAppName("RDD")
val sc = new SparkContext(rddConf)
//创建RDD
//从内存中创建RDD,将内存中的集合数据作为处理的数据眼
val seq = Seq[Int](1, 2, 3, 4)
//parallelize并行
// var value: RDD[Int] = sc.parallelize(seq)
val value: RDD[Int] = sc.makeRDD(seq)
// 关闭环境
sc.stop()
}
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
2) 从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,
比如 HDFS、HBase 等。
def main(args: Array[String]): Unit = {
val rddConf = new SparkConf().setMaster("[local]").setAppName("RDD")
val context = new SparkContext(rddConf)
//从文见中创建RDD,将文件中的数据作为处理的数据源
//path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径,还可以是路径
//此处路径还可以是分布式存储系统路径HDFS hdfs:hadoop167:8020/test.txt
//E:\workPlace\sparkdemo\datas\1.txt
val rdd: RDD[String] = context.textFile("data/1*.txt");
//testFile:以行为单位读取,读取的数据都是字符串
// WholeTestFile :已文件为单位读取,读取的结果为元组,第一个元素为文件路径,第二个元素为元素内容
val rdd1 = context.wholeTextFiles("datas")
rdd.collect().foreach(println)
context.stop()
}
3) 从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。详情请参考后续章节
4) 直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
这两种方式很少使用
2 RDD 并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
def main(args: Array[String]): Unit = {
//准备环境
val rddConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
// rddConf.set("spark.default.parallelism", "5")
val sc = new SparkContext(rddConf)
val list = List(1, 2, 3, 4)
//创建RDD
//RDD 的并行度& 分区
//makeRDD 方法可以传递第二个参数,这个参数表示分区的数量
//第二个参数是可以不传的,defaultParallelism(默认并行度),
// scheduler.conf.getInt("spark.default.parallelism", totalCores)
//Spark在默认情况下,从配置中获取配置参数:spark.default.parallelism,如果获取不到
//使用totalCores,这个属性取值为当前运行环境的最大可用核数
val rdd: RDD[Int] = sc.makeRDD(list, numSlices = 2)
//将处理的数据保存为分区文件
rdd.saveAsTextFile("output")
// 关闭环境
sc.stop()
}
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的
Spark 核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下
math.min(defaultParallelism, 2)
val rdd: RDD[String] = context.textFile("data/1*.txt",3);
//spark 读取文件的时候,底层其实是使用的Hadoop的读取方式
//分区的计算方式:
//totalSize = 7 文件总字节数
// goalSize = 7 /2 =3 每个分区存放数量
// 7/3=2...1 大于分区的1.1倍,创建新的文件
// 数据分区的分配
/* 1.spark读取文件,采用的是hadoop的方式读取的,所以是一行一行读取,和字节数没有关系
偏移量不会被重复读取
2.数据读取时以便宜量为单位
3.数据分区的偏移量计算
*/
领取专属 10元无门槛券
私享最新 技术干货