首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将数据帧中的数据存储在变量中,以用作cassandra中select的参数?

如何将数据帧中的数据存储在变量中,以用作cassandra中select的参数?
EN

Stack Overflow用户
提问于 2021-05-24 07:24:44
回答 2查看 99关注 0票数 0

我有一个Spark结构的流媒体应用程序。应用程序从kafka接收数据,并且应该使用这些值​​作为参数来处理来自cassandra数据库的数据。我的问题是,我如何使用输入数据帧(kafka)中的数据,作为cassandra中的"where“参数"select”,而不会出现以下错误:

代码语言:javascript
复制
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

这是我的df输入:

代码语言:javascript
复制
 val df = spark
  .readStream
  .format("kafka")
  .options(
    Map("kafka.bootstrap.servers"-> kafka_bootstrap,
      "subscribe" -> kafka_topic,
      "startingOffsets"-> "latest",
      "fetchOffset.numRetries"-> "5",
      "kafka.group.id"-> groupId
    ))
  .load()

每当我尝试将数据帧值​​存储在变量中以用作参数时,都会出现此错误。

这是我创建的将数据转换为变量的方法。这样,spark就会给出我之前提到的错误:

代码语言:javascript
复制
def processData(messageToProcess: DataFrame): DataFrame = {

val messageDS: Dataset[Message] = messageToProcess.as[Message]

val listData: Array[Message] = messageDS.collect()

listData.foreach(x => println(x.country))

val mensagem = messageToProcess

mensagem

}

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-06-07 01:23:28

当您需要使用Kafka中的数据来查询Cassandra中的数据时,这样的操作是两个数据集之间的典型连接-您不需要调用.collect来查找条目,只需执行连接即可。这是一件非常典型的事情--用来自外部数据集的数据丰富Kafka中的数据,而Cassandra提供了低延迟的操作。

您的代码可能如下所示(您需要配置所谓的DirectJoin,请参阅下面的链接):

代码语言:javascript
复制
import spark.implicits._
import org.apache.spark.sql.cassandra._

val df = spark.readStream.format("kafka")
  .options(Map(...)).load()
... decode data in Kafka into columns
val cassdata = spark.read.cassandraFormat("table", "keyspace").load
val joined = df.join(cassdata, cassdata("pk") === df("some_column"))
val processed = ... process joined data

val query = processed.writeStream.....output data somewhere...start()
query.awaitTermination()

我有关于如何在Cassandra中执行有效的数据连接的detailed blog post

票数 1
EN

Stack Overflow用户

发布于 2021-06-06 23:30:33

如错误消息所示,您必须使用writeStream.start()才能执行结构化流查询。您不能在流式数据帧上使用与批处理数据帧相同的操作(如.collect().show().count()),请参阅Spark Structured streaming文档的Unsupported Operations section

在您的示例中,您正在尝试对流数据集使用messageDS.collect(),这是不允许的。要实现此目标,您可以使用foreachBatch输出接收器来收集每个微批中所需的行:

代码语言:javascript
复制
streamingDF.writeStream.foreachBatch { (microBatchDf: DataFrame, batchId: Long) =>
    // Now microBatchDf is no longer a streaming dataframe
    // you can check with microBatchDf.isStreaming
    
    val messageDS: Dataset[Message] = microBatchDf.as[Message]

    val listData: Array[Message] = messageDS.collect()

    listData.foreach(x => println(x.country))
    
    // ...
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67665204

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档