我是Flink的新手,我需要从Kafka读取数据,通过使用一些API并写入S3,有条件地丰富这些数据(如果记录属于X类的话)。我用上面的逻辑制作了一个hello world Flink应用程序,它工作起来很有魅力。(选项1)进行指数重试,直到从API获得响应为止,但是这会阻塞队列,因此我不喜欢这样
选项2)使用另一个主题(称为主题失败),并在API关闭时将其发布到。同样
; at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api