我们让多个应用程序使用者收听相同的kafka主题,并且生产者在向主题发送消息时设置消息头,以便特定实例能够评估标头并处理消息。例如
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
log.info("sydney order request received message is {}",message.getName());
}在SpringCloudStream3.0.0中,@StreamListener被废弃了,我无法在函数中找到与条件属性等价的内容。
有什么建议吗?
发布于 2021-09-19 20:11:11
虽然我也找不到功能方法的等效点,但我确实有一个建议。
@StreamListener注释条件不能阻止以下事实:应用程序必须在将消息传递给侦听器(fullfillOrder())之前使用消息、读取其标头并筛选出特定的记录。因此,可以安全地假设您正在使用与主题相关的每一条消息(由Spring为我们实现的事件接收器在幕后实现),但是侦听器只在header ==悉尼时才被执行。
如果有一种方法可以配置Spring使用的事件接收器(在命中侦听器之前丢弃消息),我建议研究一下。如果没有,会在进行任何处理之前过滤掉任何消息(非悉尼)。如果您熟悉Spring的功能方法,应该如下所示:
@Bean
public Consumer<Message<TestObj>> fulfillOrder() {
return msg -> {
// to get header - msg.getHeaders().get(key, valueType);
// filter out bad messages
}
}或
@Bean
public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
return msg -> {
// msg.headers().lastHeader("franchiseName").value() -> filter em out
}
}其他:^我的代码假设您正在通过spring-cloud-stream-binder-kafka将kafka-client API与Spring云流集成。根据列出的标签,我将注意为Kafka提供了两个版本的绑定器--一个用于kafka客户端库,另一个用于kafka流库。
在不考虑Spring /框架的情况下,kafka流中的高lvl不允许您访问标头,但是低级别处理器API提供了访问。从示例来看,您似乎是在利用客户端绑定,而不是spring-cloud-stream-binder-kafka-streams / kafka流绑定。我还没有看到使用低级别处理器API实现spring云流+ kafka流绑定器,所以我无法判断这是否是目标。
https://stackoverflow.com/questions/69216229
复制相似问题