RDD的五大特性
其中有一个就是 - A list of dependencies on other RDDs
(依赖关系)
当RDD运行出错时或造成数据丢失,可以根据依赖关系,重新计算并获取数据。
若rdd4运算过程中出现错误,它可以根据它的依赖关系,从头到尾再运行一遍。
所谓血缘,简单说就是,你的父亲是谁,你父类的父亲是谁,你父类的父亲的父亲又是谁。就相当于家里的族谱。通过族谱你可以知道,你的祖先是谁。在spark中可以通过toDebugString可以产线RDD的依赖关系线。
案例:通过wroldCount程序讲解说明 源代码:方便对比后面的改动
@Test
def worldCount():Unit={
//读取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
// 内容扁平化
val worldList: RDD[String] = lines.flatMap(_.split(" "))
// 内容分组
val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)
// 统计单词数量
val result=groupList.map(x=>(x._1,x._2.size))
println(result.collect().toList)
}
使用toDebugString
打印RDD
之间的依赖线
@Test
def worldCount():Unit={
//读取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
println("*"*50)
println(lines.toDebugString)
println("lines","-"*50)
// 内容扁平化
val worldList: RDD[String] = lines.flatMap(_.split(" "))
println(worldList.toDebugString)
println("worldList","-"*50)
// 内容分组
val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)
println(groupList.toDebugString)
println("groupList","-"*50)
// 统计单词数量
val result=groupList.map(x=>(x._1,x._2.size))
println(result.toDebugString)
println("result","-"*50)
println(result.collect().toList)
}
结果:
**************************************************
(5) file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(lines,--------------------------------------------------)
(5) MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(worldList,--------------------------------------------------)
(5) ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
+-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
| MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(groupList,--------------------------------------------------)
(5) MapPartitionsRDD[5] at map at MapAndMapPartitions.scala:190 []
| ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
+-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
| MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(result,--------------------------------------------------)
lines 的依赖关系
(5) file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
RDD(lines
)需要依赖HadoopRDD
;MapPartitionsRDD
就是lines
本身这个RDD
;
这一步操作,完成了从文件中读取数据,
worldList 的依赖关系:
它的父RDD就是lines
,所以需要依赖MapPartitionsRDD
,同时也会继承父RDD的依赖。
(5) MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
result 的依赖关系:
中间的依赖关系都是这样,所以就省略了,到了result
这个RDD,除了继承它的父RDD外,也会把它父RDD之前的依赖关系,都会继承下来。
(5) MapPartitionsRDD[5] at map at MapAndMapPartitions.scala:190 []
| ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
+-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
| MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
总结:一整个job中所有rdd的链条
依赖关系: 是指两个RDD的关系
spark RDD依赖关系分为两种:
依旧时上面的案例
@Test
def worldCount():Unit={
//读取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)
println("*"*50)
println(lines.dependencies)
println("lines","-"*50)
// 内容扁平化
val worldList: RDD[String] = lines.flatMap(_.split(" "))
println(worldList.dependencies)
println("worldList","-"*50)
// 内容分组
val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)
println(groupList.dependencies)
println("groupList","-"*50)
// 统计单词数量
val result=groupList.map(x=>(x._1,x._2.size))
println(result.dependencies)
println("result","-"*50)
println(result.collect().toList)
}
结果
**************************************************
List(org.apache.spark.OneToOneDependency@623ebac7)
(lines,--------------------------------------------------)
List(org.apache.spark.OneToOneDependency@3dd31157)
(worldList,--------------------------------------------------)
List(org.apache.spark.ShuffleDependency@34b9eb03)
(groupList,--------------------------------------------------)
List(org.apache.spark.OneToOneDependency@606f81b5)
(result,--------------------------------------------------)
VS
**************************************************
(5) file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(lines,--------------------------------------------------)
(5) MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(worldList,--------------------------------------------------)
(5) ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
+-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
| MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(groupList,--------------------------------------------------)
(5) MapPartitionsRDD[5] at map at MapAndMapPartitions.scala:190 []
| ShuffledRDD[4] at groupBy at MapAndMapPartitions.scala:185 []
+-(5) MapPartitionsRDD[3] at groupBy at MapAndMapPartitions.scala:185 []
| MapPartitionsRDD[2] at flatMap at MapAndMapPartitions.scala:180 []
| file:///C:/Users/123456/Desktop/worldCount.txt MapPartitionsRDD[1] at textFile at MapAndMapPartitions.scala:174 []
| file:///C:/Users/123456/Desktop/worldCount.txt HadoopRDD[0] at textFile at MapAndMapPartitions.scala:174 []
(result,--------------------------------------------------)
注意到没有:RDD('groupList') 是一个宽依赖
(ShuffledRDD
),会进行一次shuffle
(通过ShuffledRDD
可以看出来;其他都是窄依赖
(OneToOneDependency
)。
spark只有两种依赖宽依赖(WideDependence),窄依赖(NarrowDependency)
宽依赖(WideDependence):只有一个
窄依赖(NarrowDependency):有三个
主要用于进行shuffle
切分的
血统: 一个job中rdd先后顺序的链条
依赖: 两个RDD的关系
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。