首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花结构化流确认消息

火花结构化流确认消息
EN

Stack Overflow用户
提问于 2020-02-11 20:51:52
回答 1查看 323关注 0票数 0

我使用火花结构流从卡夫卡主题(例如topic1)中读取,并使用接收器写入另一个主题(主题1-结果)。我可以看到,在使用Sink编写到另一个主题之后,这些消息并没有从Topic1中删除。

代码语言:javascript
复制
// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("subscribe", "topic1")
  .load()

//SINK to another topic 
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1")
  .option("checkpointLocation", "/tmp/checkpoint1")
  .option("topic", "topic1-result")
  .start()

文档说我们不能对结构化流使用自动提交。

enable.auto.commit: Kafka源代码不提交任何偏移量。

但是如何确认消息并从主题(topic1)中删除处理过的消息

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-02-11 21:09:55

两个考虑因素:

  1. 信息不会从卡夫卡删除,一旦你已经承诺。当使用者执行提交时,Kafka会增加此主题对已创建的使用者组的偏移量。但是,根据您为主题配置的保留时间,邮件仍保留在主题中。
  2. 事实上,Kafka源没有提交,流存储指向流的检查点dir中下一条消息的偏移量。因此,当您重新启动流时,它将使用最后一个偏移量。
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60177182

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档