首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Scala数据集映射主要工作,但功能不起作用。

Scala数据集映射主要工作,但功能不起作用。
EN

Stack Overflow用户
提问于 2017-11-05 09:28:24
回答 1查看 958关注 0票数 0

我有两个数据集:

代码语言:javascript
运行
复制
implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("app").master("local[1]")
  .config("spark.executor.memory", "1g")
  .getOrCreate()


import spark.implicits._
val ds1 = /*read csv file*/.as[caseClass1]   
val ds2 = /*read csv file*/.as[caseClass2]  

然后我就加入到地图上来:

代码语言:javascript
运行
复制
  val ds3 = ds1.
  joinWith(ds2, ds1("id") === ds2("id"))
  .map{case(left, right) => (left, Option(right))}

得到预期的结果。

问题是,我正在尝试使用该函数和其他一些函数来实现RichDataset,如下所示:

代码语言:javascript
运行
复制
object Extentions {

  implicit class RichDataset[T <: Product](leftDs: Dataset[T]) {

    def leftJoinWith[V <: Product](rightDs: Dataset[V], condition: 
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
      import spark.implicits._

      leftDs.joinWith(rightDs, condition, "left")
        .map{case(left, right) => (left, Option(right))}
    }
  }
 }

主要而言,导入Extentions._对leftJoinWith的调用失败:

Error:(15, 13) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .map{case(left, right) => (left, Option(right))}

Error:(15, 13) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[(T, Option[V])])org.apache.spark.sql.Dataset[(T, Option[V])]. Unspecified value parameter evidence$6. .map{case(left, right) => (left, Option(right))}

..。但是spark.implicits._是在函数中导入的!

如果只返回join,而不是join + map,那么它将在main和函数中同时工作。

scalaVersion := "2.11.8",sparkVersion := "2.2.0“

提前感谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-11-05 13:43:54

如果将TypeTag添加到泛型类型参数中,它可以工作(在Spark的源代码中可以看到这一点):

代码语言:javascript
运行
复制
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{Column, Dataset, SparkSession}


object Extentions {

  implicit class RichDataset[T <: Product : TypeTag](leftDs: Dataset[T]) {

    def leftJoinWith[V <: Product : TypeTag](rightDs: Dataset[V], condition:
    Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
      import spark.implicits._

      leftDs.joinWith(rightDs, condition, "left")
        .map{case(left, right) => (left, Option(right))}
    }
  }
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47120041

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档