我正在编写一个自定义火花结构化流宿写事件从Kafka读取到谷歌BQ(大查询)。下面是我写的代码。
代码正在成功编译和运行。但是我的Sink总是只在一个执行器中运行(总是在驱动程序运行的地方)。我不明白这里的问题。
下面是我的自定义Big Query Sink的实现。
package bq
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
class DefaultSource extends StreamSinkProvider with DataSourceRegister{
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new BQSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "bq"
}class BQSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val df = data.sparkSession.createDataFrame
(data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
df.collect().foreach({ row => {
//code that writes the rows to Big Query.
}
}这是我的驱动程序
// Reading raw events from Kafka
val inputDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
.option("subscribe", "topic")
.option("fetchOffset.numRetries", 5)
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
.selectExpr("value")
.as[Array[Byte]];
// Transforming inputDF to OutputDF
val outputDF = inputDF.map(event => transform(event))
// Writing outputDF events to BQ
val query = outputDF.writeStream
.format("bq")
.option("checkpointLocation",config.getString("checkpointLocation"))
.outputMode(OutputMode.Append())
.start()
//Start Streaming
query.awaitTermination()尽管我的主题有多个分区,但我的自定义接收器仅在单个执行器中运行
发布于 2021-02-28 14:53:39
使用df.collect将所有数据从执行器收集到您的驱动程序。因此,您只能看到驱动程序向您的接收器发送数据。
您需要执行df.foreachPartition并使用可在您的执行器上访问的BQ生成器。你可能会遇到“任务不可序列化”的问题,但是你可以看看here来了解如何在Scala Spark中解决这个问题。
https://stackoverflow.com/questions/66404244
复制相似问题