首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spark 核心编程RDD的使用(1)

1 RDD 创建

在 Spark 中创建 RDD 的创建方式可以分为四种:

1) 从集合(内存)中创建 RDD

从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

def main(args: Array[String]): Unit = {

 //准备环境

 val rddConf = new SparkConf().setMaster("loval[*]").setAppName("RDD")

 val sc = new SparkContext(rddConf)

 //创建RDD

 //从内存中创建RDD,将内存中的集合数据作为处理的数据眼

 val seq = Seq[Int](1, 2, 3, 4)

 //parallelize并行

 //    var value: RDD[Int] = sc.parallelize(seq)

 val value: RDD[Int] = sc.makeRDD(seq)

 // 关闭环境

 sc.stop()

}

从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

def makeRDD[T: ClassTag](

seq: Seq[T],

numSlices: Int = defaultParallelism): RDD[T] = withScope {

parallelize(seq, numSlices)

}

2) 从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,

比如 HDFS、HBase 等。

def main(args: Array[String]): Unit = {

 val rddConf = new SparkConf().setMaster("[local]").setAppName("RDD")

 val context = new SparkContext(rddConf)

 //从文见中创建RDD,将文件中的数据作为处理的数据源

 //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径,还可以是路径

 //此处路径还可以是分布式存储系统路径HDFS   hdfs:hadoop167:8020/test.txt

 //E:\workPlace\sparkdemo\datas\1.txt

 val rdd: RDD[String] = context.textFile("data/1*.txt");

 //testFile:以行为单位读取,读取的数据都是字符串

 //    WholeTestFile :已文件为单位读取,读取的结果为元组,第一个元素为文件路径,第二个元素为元素内容

 val rdd1 = context.wholeTextFiles("datas")

 rdd.collect().foreach(println)

 context.stop()

}

3) 从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD。详情请参考后续章节

4) 直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

这两种方式很少使用

2 RDD 并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

def main(args: Array[String]): Unit = {

//准备环境

val rddConf = new SparkConf().setMaster("local[*]").setAppName("RDD")

// rddConf.set("spark.default.parallelism", "5")

val sc = new SparkContext(rddConf)

val list = List(1, 2, 3, 4)

//创建RDD

//RDD 的并行度& 分区

//makeRDD  方法可以传递第二个参数,这个参数表示分区的数量

//第二个参数是可以不传的,defaultParallelism(默认并行度),

// scheduler.conf.getInt("spark.default.parallelism", totalCores)

//Spark在默认情况下,从配置中获取配置参数:spark.default.parallelism,如果获取不到

//使用totalCores,这个属性取值为当前运行环境的最大可用核数

val rdd: RDD[Int] = sc.makeRDD(list, numSlices = 2)

//将处理的数据保存为分区文件

rdd.saveAsTextFile("output")

// 关闭环境

sc.stop()

}

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的

Spark 核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {

(0 until numSlices).iterator.map { i =>

val start = ((i * length) / numSlices).toInt

val end = (((i + 1) * length) / numSlices).toInt

(start, end)

}

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下

math.min(defaultParallelism, 2)

val rdd: RDD[String] = context.textFile("data/1*.txt",3);

//spark 读取文件的时候,底层其实是使用的Hadoop的读取方式

 //分区的计算方式:

 //totalSize = 7 文件总字节数

 // goalSize = 7 /2 =3  每个分区存放数量

 // 7/3=2...1     大于分区的1.1倍,创建新的文件

// 数据分区的分配

 /* 1.spark读取文件,采用的是hadoop的方式读取的,所以是一行一行读取,和字节数没有关系

 偏移量不会被重复读取

    2.数据读取时以便宜量为单位

    3.数据分区的偏移量计算

 */

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20210317A06C0Z00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券