首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring @StreamListener条件不推荐替代方案

Spring @StreamListener条件不推荐替代方案
EN

Stack Overflow用户
提问于 2021-09-16 23:31:12
回答 1查看 2.7K关注 0票数 5

我们让多个应用程序使用者收听相同的kafka主题,并且生产者在向主题发送消息时设置消息头,以便特定实例能够评估标头并处理消息。例如

代码语言:javascript
复制
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}

在SpringCloudStream3.0.0中,@StreamListener被废弃了,我无法在函数中找到与条件属性等价的内容。

有什么建议吗?

EN

回答 1

Stack Overflow用户

发布于 2021-09-19 20:11:11

虽然我也找不到功能方法的等效点,但我确实有一个建议。

@StreamListener注释条件不能阻止以下事实:应用程序必须在将消息传递给侦听器(fullfillOrder())之前使用消息、读取其标头并筛选出特定的记录。因此,可以安全地假设您正在使用与主题相关的每一条消息(由Spring为我们实现的事件接收器在幕后实现),但是侦听器只在header ==悉尼时才被执行。

如果有一种方法可以配置Spring使用的事件接收器(在命中侦听器之前丢弃消息),我建议研究一下。如果没有,会在进行任何处理之前过滤掉任何消息(非悉尼)。如果您熟悉Spring的功能方法,应该如下所示:

代码语言:javascript
复制
@Bean
public Consumer<Message<TestObj>> fulfillOrder() {
    return msg -> {
        // to get header - msg.getHeaders().get(key, valueType);
        // filter out bad messages
    }
}

代码语言:javascript
复制
@Bean
public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
    return msg -> {
        // msg.headers().lastHeader("franchiseName").value() -> filter em out
    }
}

其他:^我的代码假设您正在通过spring-cloud-stream-binder-kafka将kafka-client API与Spring云流集成。根据列出的标签,我将注意为Kafka提供了两个版本的绑定器--一个用于kafka客户端库,另一个用于kafka流库。

在不考虑Spring /框架的情况下,kafka流中的高lvl不允许您访问标头,但是低级别处理器API提供了访问。从示例来看,您似乎是在利用客户端绑定,而不是spring-cloud-stream-binder-kafka-streams / kafka流绑定。我还没有看到使用低级别处理器API实现spring云流+ kafka流绑定器,所以我无法判断这是否是目标。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69216229

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档