首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >避免多次流查询

避免多次流查询
EN

Stack Overflow用户
提问于 2018-02-13 02:01:59
回答 3查看 3.6K关注 0票数 5

我有一个结构化的流查询,它深入到Kafka。此查询具有复杂的聚合逻辑。

我想将这个查询的输出DF汇到多个Kafka主题,每个主题在不同的‘key’列上分区。我不希望每个不同的Kafka主题都有多个Kafka接收器,因为这意味着要运行多个流查询--每个Kafka主题一个,特别是因为我的聚合逻辑很复杂。

问题:

  1. 是否有一种方法可以将结构化流查询的结果输出到多个Kafka主题,每个主题都有不同的键列,但不必执行多个流查询?
  2. 如果没有,那么将多个查询级联起来,使第一个查询进行复杂聚合并将输出写入Kafka,然后其他查询只读取第一个查询的输出并将其主题写入Kafka,从而避免再次进行复杂的聚合,这样会有效吗?

提前感谢您的帮助。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-03-21 19:19:46

所以答案是盯着我的眼睛看。它也被记录在案。链接到下面。

您可以从一个查询中写入多个Kafka主题。如果您想要编写的dataframe有一个名为" topic“的列(以及"key”和"value“列),它将将一行的内容写入该行中的主题。这是自动工作的。因此,您需要了解的唯一问题是如何生成该列的值。

这是记录在案的- https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

票数 10
EN

Stack Overflow用户

发布于 2018-07-13 06:44:19

我也在寻找这个问题的解决方案,在我的例子中,它不一定是卡夫卡沉没。我想在sink1中编写一些数据记录,而在sink2中编写一些其他记录(取决于某些条件,而不是在2个流查询中两次读取相同的数据)。目前,似乎不可能按照当前的实现( createSink()方法在DataSource.scala中提供对单个接收器的支持)。

然而,在Spark2.4.0中出现了一个新的api : foreachBatch(),它将给一个dataframe微批处理提供句柄,该批处理可以用于缓存数据存储、写入不同的接收器或多次处理,然后才能释放aagin。就像这样:

代码语言:javascript
复制
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}

现在,这个特性可以在databricks运行时使用:https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch

编辑15/11/18 :现在可以在Spark2.4.0( https://issues.apache.org/jira/browse/SPARK-24565)中找到

票数 6
EN

Stack Overflow用户

发布于 2018-02-16 04:04:05

没有办法有一个单一的读取和多个写入的结构化流的方框。唯一的方法是实现将写入多个主题的自定义接收器。

每当您调用dataset.writeStream().start() spark时,启动一个从源(readStream())读取并写入接收器(writeStream())的新流。

即使你试图级联它,星星之火也会创建两个不同的流,每个源和一个接收器。换句话说,它将读取、处理和写入数据两次:

代码语言:javascript
复制
Dataset df = <aggregation>; 
StreamingQuery sq1 = df.writeStream()...start(); 
StreamingQuery sq2 = df.writeStream()...start();

有一种方法可以在火花流中缓存读取数据,但是这个选项还不能用于结构化流。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48758392

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档