我使用火花结构流从卡夫卡主题(例如topic1)中读取,并使用接收器写入另一个主题(主题1-结果)。我可以看到,在使用Sink编写到另一个主题之后,这些消息并没有从Topic1中删除。
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.load()
//SINK to another topic
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("checkpointLocation", "/tmp/checkpoint1")
.option("topic", "topic1-result")
.start()文档说我们不能对结构化流使用自动提交。
enable.auto.commit: Kafka源代码不提交任何偏移量。
但是如何确认消息并从主题(topic1)中删除处理过的消息
发布于 2020-02-11 21:09:55
两个考虑因素:
https://stackoverflow.com/questions/60177182
复制相似问题