首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Pyspark中使用Scala类

如何在Pyspark中使用Scala类
EN

Stack Overflow用户
提问于 2016-03-15 23:01:09
回答 2查看 19.7K关注 0票数 29

我已经搜索了一段时间了,如果有任何方法在Pyspark中使用一个Pyspark类,我还没有找到任何关于这个主题的文档或指南。

假设我在Scala中创建了一个简单的类,它使用了一些apache-spark库,如下所示:

代码语言:javascript
运行
复制
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • 有没有可能在Pyspark中使用这个类?
  • 是不是太难了?
  • 我必须创建一个.py文件吗?
  • 有什么指南说明如何做到这一点吗?

顺便说一句,我还看了一下spark代码,我感到有点迷茫,我无法为自己的目的复制它们的功能。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-03-16 00:23:03

是的,这是可能的,虽然可以远不是琐碎的。通常,您需要一个Java (友好)包装器,这样您就不必处理Scala特性,这些特性很难用普通Java来表达,因此不能很好地使用Py4J网关。

假设您的类在包com.example中,并让Python DataFrame调用df

代码语言:javascript
运行
复制
df = ... # Python DataFrame

你必须:

  1. 使用您最喜欢的构建工具构建一个jar。
  2. 例如,将它包含在驱动程序类路径中,例如使用--driver-class-path参数作为PySpark shell / spark-submit。根据确切的代码,您可能还必须使用--jars传递它。
  3. 从Python实例中提取SparkContext实例: jvm = sc._jvm
  4. 从一个SQLContext实例中提取Scala SQLContext: ssqlContext = sqlContext._ssql_ctx
  5. DataFrame中提取df: jdf = df._jdf
  6. 创建SimpleClass的新实例: simpleObject = jvm.com.example.SimpleClass(ssqlContext,jdf,"v")
  7. 调用exe方法并使用Python DataFrame包装结果: 来自pyspark.sql import DataFrame DataFrame(simpleObject.exe(),ssqlContext)

结果应该是一个有效的PySpark DataFrame。当然,您可以将所有步骤合并到一个调用中。

重要:只有在仅在驱动程序上执行代码的情况下,这种方法才有可能。它不能在Python操作或转换中使用。详情请参见如何从动作或转换中使用Java/Scala函数?

票数 41
EN

Stack Overflow用户

发布于 2022-05-05 15:37:38

作为对@zero323 323答案的一个更新,考虑到星火的API在过去六年中已经发生了变化,在Spark-3.2中有效的配方如下:

  1. 将Scala代码编译成JAR文件(例如使用sbt assembly)
  2. 将JAR文件包含在--jars参数到spark-submit中,以及本地包定义所需的任何--py-files参数
  3. 提取Python中的JVM实例:
代码语言:javascript
运行
复制
jvm = spark._jvm
  1. 提取SparkSession的Java表示
代码语言:javascript
运行
复制
jSess = spark._jsparkSession
  1. 提取PySpark DataFrame的Java表示
代码语言:javascript
运行
复制
jdf = df._jdf
  1. 创建SimpleClass的新实例
代码语言:javascript
运行
复制
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
  1. 调用exe方法并将其输出转换为PySpark DataFrame
代码语言:javascript
运行
复制
from pyspark.sql import DataFrame

result = DataFrame(simpleObject.exe(), spark)

如果您需要传递额外的参数(例如Python字典),PySpark可以自动将它们转换为相应的Java类型,它们在您的Scala方法中出现之前就出现了。Scala提供了JavaConverters包,以帮助将其转换为更自然的Scala数据类型。例如,Python字典可以传递给Scala方法,并立即从HashMap转换为Scala (可变的) Map:

代码语言:javascript
运行
复制
def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
  import scala.collection.JavaConverters._
  val params = jparams.asScala
  ...
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36023860

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档