我有一个结构化的流查询,它深入到Kafka。此查询具有复杂的聚合逻辑。
我想将这个查询的输出DF汇到多个Kafka主题,每个主题在不同的‘key’列上分区。我不希望每个不同的Kafka主题都有多个Kafka接收器,因为这意味着要运行多个流查询--每个Kafka主题一个,特别是因为我的聚合逻辑很复杂。
问题:
提前感谢您的帮助。
发布于 2018-03-21 19:19:46
所以答案是盯着我的眼睛看。它也被记录在案。链接到下面。
您可以从一个查询中写入多个Kafka主题。如果您想要编写的dataframe有一个名为" topic“的列(以及"key”和"value“列),它将将一行的内容写入该行中的主题。这是自动工作的。因此,您需要了解的唯一问题是如何生成该列的值。
发布于 2018-07-13 06:44:19
我也在寻找这个问题的解决方案,在我的例子中,它不一定是卡夫卡沉没。我想在sink1中编写一些数据记录,而在sink2中编写一些其他记录(取决于某些条件,而不是在2个流查询中两次读取相同的数据)。目前,似乎不可能按照当前的实现( createSink()方法在DataSource.scala中提供对单个接收器的支持)。
然而,在Spark2.4.0中出现了一个新的api : foreachBatch(),它将给一个dataframe微批处理提供句柄,该批处理可以用于缓存数据存储、写入不同的接收器或多次处理,然后才能释放aagin。就像这样:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.cache()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.uncache()
}现在,这个特性可以在databricks运行时使用:https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch
编辑15/11/18 :现在可以在Spark2.4.0( https://issues.apache.org/jira/browse/SPARK-24565)中找到
发布于 2018-02-16 04:04:05
没有办法有一个单一的读取和多个写入的结构化流的方框。唯一的方法是实现将写入多个主题的自定义接收器。
每当您调用dataset.writeStream().start() spark时,启动一个从源(readStream())读取并写入接收器(writeStream())的新流。
即使你试图级联它,星星之火也会创建两个不同的流,每个源和一个接收器。换句话说,它将读取、处理和写入数据两次:
Dataset df = <aggregation>;
StreamingQuery sq1 = df.writeStream()...start();
StreamingQuery sq2 = df.writeStream()...start();有一种方法可以在火花流中缓存读取数据,但是这个选项还不能用于结构化流。
https://stackoverflow.com/questions/48758392
复制相似问题