Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >使用Spring Embedded Kafka测试@KafkaListener

使用Spring Embedded Kafka测试@KafkaListener
EN

Stack Overflow用户
提问于 2018-05-01 20:48:22
回答 2查看 19.8K关注 0票数 20

我正在尝试为我正在使用Spring Boot2.x开发的Kafka侦听器编写单元测试。作为一个单元测试,我不想启动一个完整的Kafka服务器,一个Zookeeper的实例。因此,我决定使用Spring嵌入式Kafka。

我的听众的定义是非常基本的。

代码语言:javascript
运行
AI代码解释
复制
@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

此外,在接收到消息后验证latch计数器是否等于零的测试也非常简单。

代码语言:javascript
运行
AI代码解释
复制
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

不幸的是,测试失败了,我不明白为什么。是否可以使用KafkaEmbedded的实例来测试用注释@KafkaListener标记的方法

所有代码都在我的GitHub存储库kafka-listener中共享。

感谢所有人。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-05-01 21:17:59

您可能是在为使用者分配主题/分区之前发送消息。设置属性...

代码语言:javascript
运行
AI代码解释
复制
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it默认为latest

这就像在控制台使用者中使用--from-beginning一样。

编辑

哦;你没有使用boot的属性。

添加

代码语言:javascript
运行
AI代码解释
复制
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

顺便说一句,您可能还应该对template.send() ( Future<>)的结果执行get(10L, TimeUnit.SECONDS),以断言发送成功。

EDIT3

要覆盖仅用于测试的偏移量重置,您可以执行与对代理地址相同的操作:

代码语言:javascript
运行
AI代码解释
复制
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

代码语言:javascript
运行
AI代码解释
复制
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

但是,请记住,此属性仅在组第一次使用时应用。要在每次应用程序启动时始终在末尾启动,您必须在启动期间一直搜索到末尾。

此外,我建议将enable.auto.commit设置为false,以便容器负责提交偏移量,而不是仅依赖于消费者客户端按时间计划执行该操作。

票数 14
EN

Stack Overflow用户

发布于 2020-12-28 16:04:46

也许有人会发现这很有用。我也遇到过类似的问题。本地测试正在运行(一些检查是在Awaitility.waitAtMost中执行的),但在Jenkins管道中,测试失败。

正如在投票最多的答案中已经提到的那样,解决方案是设置auto-offset-reset=earliest。当测试正在运行时,您可以通过查看测试日志来检查是否正确设置了配置。生产者和消费者的Spring输出配置

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50123621

复制
相关文章
【spring-kafka】@KafkaListener详解与使用
属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3
石臻臻的杂货铺[同名公众号]
2021/07/14
22K1
【spring-kafka】@KafkaListener详解与使用
属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3
全栈程序员站长
2022/11/04
2K0
Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
KafkaBootstrapConfiguration的主要功能是创建两个bean
小小工匠
2021/08/17
8200
Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
Spring Kafka:@KafkaListener 单条或批量处理消息
来源:csdn.net/ldw201510803006/article/details/116176711 消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 图片 可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作; Liste
程序猿DD
2022/05/05
2.3K0
Spring Kafka:@KafkaListener 单条或批量处理消息
Spring Kafka 之 @KafkaListener 单条或批量处理消息
来源:https://blog.csdn.net/ldw201510803006/article/details/116176711 消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和sto
程序猿DD
2022/06/09
1K0
Spring Kafka 之 @KafkaListener 单条或批量处理消息
SpringBoot官方笔记6消息
The Spring Framework provides extensive support for integrating with messaging systems, from simplified use of the JMS API using JmsTemplate to a complete infrastructure to receive messages asynchronously. Spring AMQP provides a similar feature set for the Advanced Message Queuing Protocol. Spring Boot also provides auto-configuration options for RabbitTemplate and RabbitMQ. Spring WebSocket natively includes support for STOMP messaging, and Spring Boot has support for that through starters and a small amount of auto-configuration. Spring Boot also has support for Apache Kafka.
dongfanger
2023/07/20
1580
Kafka消费者接收数据异常,contentType标头始终附加到消息正文
使用Spring Cloud Stream 1.3.2.RELEASE向Kafka发布String消息。 当使用命令行Kafka使用者或Spring Kafka @KafkaListener使用消息时,contentType标头始终附加到消息正文
chenchenchen
2021/09/06
1.1K0
Spring Boot 中使用@KafkaListener并发批量接收消息[通俗易懂]
kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用)。
全栈程序员站长
2022/11/04
4.6K0
Spring Boot 中使用@KafkaListener并发批量接收消息[通俗易懂]
Spring Boot Kafka概览、配置及优雅地实现发布订阅
本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net
别打名名
2019/12/24
15.8K0
Spring Boot Kafka概览、配置及优雅地实现发布订阅
Kafka、Logstash、Nginx日志收集入门
Kafka、Logstash、Nginx日志收集入门 Nginx作为网站的第一入口,其日志记录了除用户相关的信息之外,还记录了整个网站系统的性能,对其进行性能排查是优化网站性能的一大关键。 Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。一般情景下,Logstash用来和ElasticSearch和Kibana搭配使用,简称ELK,本站http://www.wenzhihuai.com除了用作ELK,还配合了K
Zephery
2018/03/12
2.1K0
Kafka、Logstash、Nginx日志收集入门
Kafka、Logstash、Nginx日志收集入门
Nginx作为网站的第一入口,其日志记录了除用户相关的信息之外,还记录了整个网站系统的性能,对其进行性能排查是优化网站性能的一大关键。 Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。一般情景下,Logstash用来和ElasticSearch和Kibana搭配使用,简称ELK。 kafka是一个分布式的基于push-subscribe的消息系统,它具备快速、可扩展、可持久化的特点。它现在是Apache旗下的一个开源系统,作为hadoop生态系统的一部分,被各种商业公司广泛应用。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。 下面是日志系统的搭建
后端码匠
2019/09/02
7920
spring kafka之如何批量给topic加前缀
最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了
lyb-geek
2022/01/06
6280
spring kafka之如何批量给topic加前缀
spring kafka之如何批量给topic加前缀
最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了
lyb-geek
2020/12/05
1.1K0
spring kafka之如何批量给topic加前缀
SpringBoot 整合 Spring-Kafka 深度探秘,踩坑实战
kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。
搜云库技术团队
2019/11/21
4.2K0
实战:彻底搞定 SpringBoot 整合 Kafka(spring-kafka深入探秘)
kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。
Java团长
2019/11/21
51.8K1
集成到ACK、消息重试、死信队列
kafka 是一个消息队列产品,基于 Topic partitions 的设计,能达到非常高的消息发送处理性能。Spring 创建了一个项目 Spring-kafka,封装了 Apache 的 Kafka-client,用于在 Spring 项目里快速集成 kafka。除了简单的收发消息外,Spring-kafka 还提供了很多高级功能,下面我们就来一一探秘这些用法。
用户4283147
2022/10/27
3.5K0
@KafkaListener和@KafkaListeners的使用
@KafkaListeners是@KafkaListener的Container Annotation,这也是jdk8的新特性之一,注解可以重复标注。
小勇DW3
2019/11/14
5.5K0
@KafkaListener和@KafkaListeners的使用
Apache Kafka-通过concurrency实现并发消费
默认情况下, Spring-Kafka @KafkaListener 串行消费的。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。
小小工匠
2021/08/17
7.6K0
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。
小小工匠
2023/06/04
4.6K0
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
SpringBoot集成kafka全面实战「建议收藏」
本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》、《秒懂kafka HA(高可用)》两篇文章。
全栈程序员站长
2022/07/01
5.3K1
SpringBoot集成kafka全面实战「建议收藏」

相似问题

Spring Kafka -如何使用@KafkaListener重试

10

Spring kafka @KafkaListener没有被调用

11

spring kafka @KafkaListener监听器容器?

120

Kafka Kstream和Spring @KafkaListener有什么不同?

25

Spring-Kafka将ConcurrentKafkaListenerContainerFactory用于多个@Kafkalistener

1187
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档