我有一个Spark结构的流媒体应用程序。应用程序从kafka接收数据,并且应该使用这些值作为参数来处理来自cassandra数据库的数据。我的问题是,我如何使用输入数据帧(kafka)中的数据,作为cassandra中的"where“参数"select”,而不会出现以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();这是我的df输入:
val df = spark
.readStream
.format("kafka")
.options(
Map("kafka.bootstrap.servers"-> kafka_bootstrap,
"subscribe" -> kafka_topic,
"startingOffsets"-> "latest",
"fetchOffset.numRetries"-> "5",
"kafka.group.id"-> groupId
))
.load()每当我尝试将数据帧值存储在变量中以用作参数时,都会出现此错误。
这是我创建的将数据转换为变量的方法。这样,spark就会给出我之前提到的错误:
def processData(messageToProcess: DataFrame): DataFrame = {
val messageDS: Dataset[Message] = messageToProcess.as[Message]
val listData: Array[Message] = messageDS.collect()
listData.foreach(x => println(x.country))
val mensagem = messageToProcess
mensagem}
发布于 2021-06-07 01:23:28
当您需要使用Kafka中的数据来查询Cassandra中的数据时,这样的操作是两个数据集之间的典型连接-您不需要调用.collect来查找条目,只需执行连接即可。这是一件非常典型的事情--用来自外部数据集的数据丰富Kafka中的数据,而Cassandra提供了低延迟的操作。
您的代码可能如下所示(您需要配置所谓的DirectJoin,请参阅下面的链接):
import spark.implicits._
import org.apache.spark.sql.cassandra._
val df = spark.readStream.format("kafka")
.options(Map(...)).load()
... decode data in Kafka into columns
val cassdata = spark.read.cassandraFormat("table", "keyspace").load
val joined = df.join(cassdata, cassdata("pk") === df("some_column"))
val processed = ... process joined data
val query = processed.writeStream.....output data somewhere...start()
query.awaitTermination()我有关于如何在Cassandra中执行有效的数据连接的detailed blog post。
发布于 2021-06-06 23:30:33
如错误消息所示,您必须使用writeStream.start()才能执行结构化流查询。您不能在流式数据帧上使用与批处理数据帧相同的操作(如.collect()、.show()或.count()),请参阅Spark Structured streaming文档的Unsupported Operations section。
在您的示例中,您正在尝试对流数据集使用messageDS.collect(),这是不允许的。要实现此目标,您可以使用foreachBatch输出接收器来收集每个微批中所需的行:
streamingDF.writeStream.foreachBatch { (microBatchDf: DataFrame, batchId: Long) =>
// Now microBatchDf is no longer a streaming dataframe
// you can check with microBatchDf.isStreaming
val messageDS: Dataset[Message] = microBatchDf.as[Message]
val listData: Array[Message] = messageDS.collect()
listData.foreach(x => println(x.country))
// ...
}https://stackoverflow.com/questions/67665204
复制相似问题