一直都有粉丝留言,问各种奇怪的问题,今天就列举一个浪尖反复解答过的问题:编写的spark 代码到底是执行在driver端还是executor端?
1.driver & executor
浪尖这里只提本文关注的两个角色,driver和executor。
首先,driver是用户提交应用程序的入口main函数执行的地方,driver主要作用就是进行job的调度,DAG构建及调度,然后调度task。
然后,executor是执行task地方,然后将结果、状态等汇集到driver,当然executor上执行的task的结果也可以是shuffle中间结果,也可以落地到外部存储。
executor之间目前不能通信,只能借助第三方来共享数据或者通信。
2.单机 vs 多线程 vs 多进程
用户编写的代码都在main函数里,按照单机版的理解,代码肯定是执行于同一台机器,同一个JVM中的。当然,调用类似processbuilder启动的进程除外。这样就可以共享一些变量,一些链接,如数据库连接。
拿段代码举例:
public static void main(String[] args){
Map<String,int> hashmap = new HashMap();
rdd.map(each=>{
if(hashmap.containsKey(each)){
hashmap.put(each,hashmap.get(each)+1);
}else{
hashmap.put(each,1);
}
})
System.out.println(hashmap.size())
}
假如map是就是一个算子,传入的是一个函数,单线程的,主要是想做一个wordcount功能,那么最后输出hashmap的大小肯定不为0.
假如map算子,是多线程线程执行,几个CPU启动几个线程执行,那么hashmap也是不为0,因为是在同一个jvm中,hashmap属于共享堆对象,当然暂不考虑并发问题。这就类似 jdk新特性,并行流处理。
假如rdd就是spark里的rdd,那么map算子传入的函数会封装成一个闭包,然后在driver构建完DAG,划分好stage和task,后driver会调度task到executor端去执行。这个时候,map函数外部的hashmap和内部的hashmap就没有关系了,输出的size是0 。
hashmap这种堆对象,数据库连接,kafka生产消费者等都是这样,不能在rdd的map算子外部声明,内部使用,因为代码都在不同的进程甚至机器中执行,这些对象都不支持跨进程共享,更别提跨机器了。
所有rdd的算子都是如此,所有Dataframe/dataset算子也是如此。
有人该抬杠可,我在idea执行的分明不是0,浪尖,你这解释是错的哦。
那是因为你local模式,进程在同一个jvm中,所以就类似模式二的多线程,当然local多核的话也会出现并发问题。
那要driver和executor 通信怎么办?
第三方存储,广播变量,累加器,executor返回值。
重要|Spark driver端得到executor返回值的方法
3. foreach vs foreachpartition vs foeachrdd
其实,在这里浪尖可以先稍微总结一下:
所有对RDD具体数据的操作都是在executor上执行的,所有对rdd自身的操作都是在driver上执行的。
因为driver端是job,stage,task等生成调度的地方,executor是task执行的地方。job,stage,task生成都离不开rdd自身,而task执行离不开具体的数据。rdd的相关的操作不能缺少driver端的sparkcontext/sparksession。
foreach/foreachPartition都是针对rdd内部数据进行处理的,所以我们传递给这些算子的函数都是执行于executor端的。
Spark源码系列之foreach和foreachPartition的区别
foreachrdd很明显是对rdd进行操作的,所以他的参数函数是在driver端执行的,而foreachrdd的参数函数内部的rdd数据处理,会进一步调度执行于executor端。所以,foreachrdd内部可以使用外部的变量,链接等。当然,foreachrdd的内部rdd的具体算子是不能的。类似的还有transform等。
public static void main(String[] args){
Map<String,int> hashmap = new HashMap();
dstream.foreachrdd(rdd=>{
String today = getToday();
if(hashmap.containsKey(today )){
hashmap.put(today ,hashmap.get(today )+1);
}else{
hashmap.put(today,1);
}
})
System.out.println(hashmap.size())
}
输出的hashmap的size就不为0,因为这段代码是执行于driver的。
public static void main(String[] args){
Map<String,int> hashmap = new HashMap();
dstream.foreachrdd(rdd=>{
rdd.map(each=>{
if(hashmap.containsKey(each)){
hashmap.put(each,hashmap.get(each)+1);
}else{
hashmap.put(each,1);
}
})
})
System.out.println(hashmap.size())
这个时候输出的hashmap的size就为0 ,因为虽然是在foreachrdd内部,但是却是对rdd具体数据的操作,所以是执行于executor端的。
4. 总结
切记:所有对RDD内部具体数据的操作执行都是在executor上进行的,所有对rdd自身的操作都是在driver上执行的。
掌握这些,才能更好理解spark,才能写出好的spark代码,才能做对业务。