首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >与自定义Spark结构化流接收器不并行

与自定义Spark结构化流接收器不并行
EN

Stack Overflow用户
提问于 2021-02-28 06:58:23
回答 1查看 64关注 0票数 0

我正在编写一个自定义火花结构化流宿写事件从Kafka读取到谷歌BQ(大查询)。下面是我写的代码。

代码正在成功编译和运行。但是我的Sink总是只在一个执行器中运行(总是在驱动程序运行的地方)。我不明白这里的问题。

下面是我的自定义Big Query Sink的实现。

代码语言:javascript
复制
package bq

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode


class DefaultSource extends StreamSinkProvider with DataSourceRegister{

  override def createSink(
                           sqlContext: SQLContext,
                           parameters: Map[String, String],
                           partitionColumns: Seq[String],
                           outputMode: OutputMode): Sink = {
    new BQSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "bq"
}
代码语言:javascript
复制
class BQSink(sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode) extends Sink {


  override def addBatch(batchId: Long, data: DataFrame): Unit = {

    val df = data.sparkSession.createDataFrame
                       (data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
   
    df.collect().foreach({ row => {
      //code that writes the rows to Big Query.
    }  
  }

这是我的驱动程序

代码语言:javascript
复制
 // Reading raw events from Kafka
    val inputDF = sparkSession.readStream
                              .format("kafka")
                              .option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
                              .option("subscribe", "topic")
                              .option("fetchOffset.numRetries", 5)
                              .option("failOnDataLoss", "false")
                              .option("startingOffsets", "latest")
                              .load()
                              .selectExpr("value")
                              .as[Array[Byte]];

    // Transforming inputDF to OutputDF
    val outputDF = inputDF.map(event => transform(event))
                          

    // Writing outputDF events to BQ
    val query = outputDF.writeStream
                        .format("bq")
                        .option("checkpointLocation",config.getString("checkpointLocation"))
                        .outputMode(OutputMode.Append())
                        .start()

    //Start Streaming
    query.awaitTermination()

尽管我的主题有多个分区,但我的自定义接收器仅在单个执行器中运行

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-28 14:53:39

使用df.collect将所有数据从执行器收集到您的驱动程序。因此,您只能看到驱动程序向您的接收器发送数据。

您需要执行df.foreachPartition并使用可在您的执行器上访问的BQ生成器。你可能会遇到“任务不可序列化”的问题,但是你可以看看here来了解如何在Scala Spark中解决这个问题。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66404244

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档