我已经搜索了一段时间了,如果有任何方法在Pyspark
中使用一个Pyspark
类,我还没有找到任何关于这个主题的文档或指南。
假设我在Scala
中创建了一个简单的类,它使用了一些apache-spark
库,如下所示:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
Pyspark
中使用这个类?.py
文件吗?顺便说一句,我还看了一下spark
代码,我感到有点迷茫,我无法为自己的目的复制它们的功能。
发布于 2016-03-16 00:23:03
是的,这是可能的,虽然可以远不是琐碎的。通常,您需要一个Java (友好)包装器,这样您就不必处理Scala特性,这些特性很难用普通Java来表达,因此不能很好地使用Py4J网关。
假设您的类在包com.example
中,并让Python DataFrame
调用df
df = ... # Python DataFrame
你必须:
--driver-class-path
参数作为PySpark shell / spark-submit
。根据确切的代码,您可能还必须使用--jars
传递它。SparkContext
实例:
jvm = sc._jvmSQLContext
实例中提取Scala SQLContext
:
ssqlContext = sqlContext._ssql_ctxDataFrame
中提取df
:
jdf = df._jdfSimpleClass
的新实例:
simpleObject = jvm.com.example.SimpleClass(ssqlContext,jdf,"v")exe
方法并使用Python DataFrame
包装结果:
来自pyspark.sql import DataFrame DataFrame(simpleObject.exe(),ssqlContext)结果应该是一个有效的PySpark DataFrame
。当然,您可以将所有步骤合并到一个调用中。
重要:只有在仅在驱动程序上执行代码的情况下,这种方法才有可能。它不能在Python操作或转换中使用。详情请参见如何从动作或转换中使用Java/Scala函数?。
发布于 2022-05-05 15:37:38
作为对@zero323 323答案的一个更新,考虑到星火的API在过去六年中已经发生了变化,在Spark-3.2中有效的配方如下:
sbt assembly
)--jars
参数到spark-submit
中,以及本地包定义所需的任何--py-files
参数jvm = spark._jvm
SparkSession
的Java表示jSess = spark._jsparkSession
DataFrame
的Java表示jdf = df._jdf
SimpleClass
的新实例simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
exe
方法并将其输出转换为PySpark DataFrame
from pyspark.sql import DataFrame
result = DataFrame(simpleObject.exe(), spark)
如果您需要传递额外的参数(例如Python字典),PySpark可以自动将它们转换为相应的Java类型,它们在您的Scala方法中出现之前就出现了。Scala提供了JavaConverters
包,以帮助将其转换为更自然的Scala数据类型。例如,Python字典可以传递给Scala方法,并立即从HashMap转换为Scala (可变的) Map:
def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
import scala.collection.JavaConverters._
val params = jparams.asScala
...
}
https://stackoverflow.com/questions/36023860
复制相似问题