在yarn集群中运行之前,对于我在pyspark中的应用程序开发,我想在本地模式下测试它。为此,我需要从工作节点内部显式地写入一些数据,我认为我可以使用hadoop rest api在集群模式下运行时将文件写入hdfs。但是,当在本地模式下运行代码时,我如何从worker任务内部写入文件?
例如:
sparkConf = SparkConf().setAppName("testing").setMaster("local[*]")
sc= SparkContext(conf=sparkConf)
rdd = sc.textFile("file://path to file")
rdd.map(lambda x:x.split("\t")[0],1).reduce(func_to_reduce);
def func_to_reduce(a,b):
//how can i write value of a and b to a file from here
return a+b;
需要说明的是:如果我在驱动程序端代码中使用open()方法写入文件,它就会起作用,但如果我使用相同的方法从reduce函数内部写入文件,则不起作用。任何方向或帮助都是非常感谢的!
发布于 2017-03-08 21:30:57
我之所以提出这个问题,是因为在reduceByKey()函数中,使用os模块创建目录和使用open()方法写入文件是不起作用的。当我在本地模式下运行时,我假设它应该可以访问我的驱动程序的本地路径,并像运行普通的python脚本一样执行这些函数。之所以不是这样,是因为我完全忘记了这样一个事实: reduceByKey更像是一种转换,而不是一种操作,因此reduceByKey()中的代码实际上并没有按照Spark的惰性求值设计运行。因此,我使用collect()对该引用调用了一个操作。现在它启动了reduce功能,并且可以很好地写入文件。感谢大家的帮助!
发布于 2017-03-05 23:44:11
您必须实际调用一个保存方法,就像您通过textFile
调用一个加载方法一样。像saveAsTextFile(path)
这样的东西
发布于 2017-03-06 02:22:42
请参阅rdd.reduce(f)
源码
Spark在使用rdd.reduce()
时调用python内置函数reduce
两次:一次是通过rdd.mapPartitions()
在worker节点中调用,另一次是在rdd.collect()
之后的driver节点中调用。
因此,您不能在rdd.reduce
函数中写入数据
您似乎想要将数据保存在工作节点的本地路径上?
https://stackoverflow.com/questions/42615324
复制相似问题