Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用Spring Embedded Kafka测试@KafkaListener

使用Spring Embedded Kafka测试@KafkaListener
EN

Stack Overflow用户
提问于 2018-05-01 20:48:22
回答 2查看 19.8K关注 0票数 20

我正在尝试为我正在使用Spring Boot2.x开发的Kafka侦听器编写单元测试。作为一个单元测试,我不想启动一个完整的Kafka服务器,一个Zookeeper的实例。因此,我决定使用Spring嵌入式Kafka。

我的听众的定义是非常基本的。

代码语言:javascript
运行
AI代码解释
复制
@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

此外,在接收到消息后验证latch计数器是否等于零的测试也非常简单。

代码语言:javascript
运行
AI代码解释
复制
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

不幸的是,测试失败了,我不明白为什么。是否可以使用KafkaEmbedded的实例来测试用注释@KafkaListener标记的方法

所有代码都在我的GitHub存储库kafka-listener中共享。

感谢所有人。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-05-01 21:17:59

您可能是在为使用者分配主题/分区之前发送消息。设置属性...

代码语言:javascript
运行
AI代码解释
复制
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it默认为latest

这就像在控制台使用者中使用--from-beginning一样。

编辑

哦;你没有使用boot的属性。

添加

代码语言:javascript
运行
AI代码解释
复制
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

顺便说一句,您可能还应该对template.send() ( Future<>)的结果执行get(10L, TimeUnit.SECONDS),以断言发送成功。

EDIT3

要覆盖仅用于测试的偏移量重置,您可以执行与对代理地址相同的操作:

代码语言:javascript
运行
AI代码解释
复制
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

代码语言:javascript
运行
AI代码解释
复制
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

但是,请记住,此属性仅在组第一次使用时应用。要在每次应用程序启动时始终在末尾启动,您必须在启动期间一直搜索到末尾。

此外,我建议将enable.auto.commit设置为false,以便容器负责提交偏移量,而不是仅依赖于消费者客户端按时间计划执行该操作。

票数 14
EN

Stack Overflow用户

发布于 2020-12-28 16:04:46

也许有人会发现这很有用。我也遇到过类似的问题。本地测试正在运行(一些检查是在Awaitility.waitAtMost中执行的),但在Jenkins管道中,测试失败。

正如在投票最多的答案中已经提到的那样,解决方案是设置auto-offset-reset=earliest。当测试正在运行时,您可以通过查看测试日志来检查是否正确设置了配置。生产者和消费者的Spring输出配置

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50123621

复制
相关文章
nodejs总结之日志模块log4js
1 /** 2 * npm install log4js 3 * 源码及文档地址:https://github.com/nomiddlename/log4js-node 4 */ 5 var log4js = require('log4js'); 6 7 /** 8 * 第一种: 9 * configure方法为配置log4js对象,内部有levels、appenders、categories三个属性 10 * levels: 11 *
晓晨
2018/06/22
3K0
制作带有kafka插件和es插件的fluentd镜像
Fluentd是用于统一日志记录层的开源数据收集器,是继Kubernetes、Prometheus、Envoy 、CoreDNS 和containerd后的第6个CNCF毕业项目,常用来对比的是elastic的logstash,相对而言fluentd更加轻量灵活,现在发展非常迅速社区很活跃,在编写这篇blog的时候github的star是8.8k,fork是1k就可见一斑.
用户2672162
2021/02/05
1.1K0
安全的移除Es数据节点步骤
说明:想要安全的移除一个es节点,不改变分片的数量,100%不会引起数据丢失,即保证这个节点的所有数据被其他节点接收。然后停止这个节点的实例。
HLee
2021/01/11
6.1K0
安全的移除Es数据节点步骤
ES6中的模块
最近在做项目的时候发现在一个模块导出的时候是返回一个NEW以后实例化的对象,在其他地方使用的是同一个对象(一直以为是不用的对象,每次导入都是一个新的。。。还是太菜)。
零式的天空
2022/03/21
2660
ES6 模块
在 ES6 前, 实现模块化使用的是 RequireJS 或者 seaJS(分别是基于 AMD 规范的模块化库, 和基于 CMD 规范的模块化库),还有 CommonJS(用于NodeJS)。
Karl Du
2023/10/20
1960
log4js的配置
log4js是一个日志记录模块,可以单独使用,也可以,结合服务框架使用,这里结合express配置来使用。
挥刀北上
2023/05/25
9400
log4js的配置
ES-Spark连接ES后,ES Client节点流量打满分析
问题描述 前段时间用es-spark读取es数遇到了client节点流量打满的现象。es-spark配置的es.nodes是es的域名。由于其中一个client是master节点,然后普通查询变得特别慢,运行20多分钟后,主节点崩溃。 解决方法 临时解决方案:降低es-spark的并发,并重启主节点。 最终解决方案:设置es.nodes.wan.only为false,即不用域名访问。将es.nodes配置为client节点的IP。 原因分析 域名访问时必须配置参数es.nodes.wan.only为true
YG
2018/05/23
3.3K3
log4js日志
该文章介绍了如何利用log4j2的配置文件对日志进行切割和归档,并给出了具体的例子。
用户1141560
2017/12/26
2.3K0
ES6——模块(module)
在 ES6 之前,社区制定了一些模块加载方案,最主要的有 CommonJS 和 AMD 两种,前者用于服务器,后者用于浏览器。CommonJS 和 AMD 模块,都只能在运行时确定这些东西。下面代码的实质是整体加载fs模块(即加载fs的所有方法),生成一个对象(_fs),然后再从这个对象上面读取 3 个方法。这种加载称为“运行时加载”。
羊羽shine
2019/07/15
8680
ES05# Elasticsearch节点指标梳理
节点状态GET _nodes/stats,简单的命令返回大量的指标信息,本文就一探究竟拨开主要指标的含义,文章主要内容有:
瓜农老梁
2022/04/28
4750
ES 节点2G内存分析
ES在如存在2G内存的数据节点,在生产环境使用过程中会经常出现节点离线现象。导致集群频繁异常。
ES小助理
2022/08/04
1.3K0
使用Typescript和ES模块发布Node模块
TypeScript已经成为一种非常流行的JavaScript语言,这是有原因的。它的类型系统和编译器能够在您的软件运行之前的编译时捕获各种bug,并且附加的代码编辑器功能使它成为一个非常适合开发人员的高效环境。
张张
2020/05/27
2.7K0
使用Typescript和ES模块发布Node模块
【模块化】:ES6 模块化
import.meta 是一个给 JavaScript 模块暴露特定上下文的元数据属性的对象。它包含了这个模块的信息,比如说这个模块的URL。
WEBJ2EE
2022/03/30
6180
【模块化】:ES6 模块化
ES6模块化的实现
1.准备工作 <!-- 首先需要在HTML文件中引入两个js文件 类型需要设置为module --> <script src="index.js" type="module"></script> <script src="main.js" type="module"></script> 2.export使用 //main.js //定义一些变量和函数 var name = '张三' var age = 20 var flag = true function sum(num1, num2){ retu
peng_tianyu
2022/12/15
2710
Node.js 中的ES模块现状[每日前端夜话0x8D]
新的 ECMAScript(ES)模块与以前的语言版本不完全兼容,因此使用的 JavaScript 引擎需要知道每一个文件是“旧” JavaScript 代码还是“新”模块。
疯狂的技术宅
2019/07/10
1.4K0
Node.js 中的ES模块现状[每日前端夜话0x8D]
Node.js 12中的ES模块[每日前端夜话0x9E]
多年来,在 JavaScript 生态中出现了不同形式的模块化方案。开发人员使用了明确定义的规范(如 AMD 或 CommonJS)以及简单的编码模式(如通过揭示模块模式(revealing module pattern))来得到模块化解决方案的好处。
疯狂的技术宅
2019/07/30
1.9K0
es6中的模块化
在ES6中每一个模块即是一个文件,在文件中定义的变量,函数,对象在外部是无法获取的。如果你希望外部可以读取模块当中的内容,就必须使用export来对其进行暴露(输出)。先来看个例子,来对一个变量进行模块化。我们先来创建一个test.js文件,来对这一个变量进行输出:
用户1272076
2019/03/26
5610
ES6 模块写法示例
注意: import 和 export 只能在顶级用,不能在代码块中用。否则会报 'import' and 'export' may only appear at the top level。例如
前端GoGoGo
2018/08/24
3620
ES 安全认证模块之XPack
  X-Pack是ES扩展功能,提供安全性,警报,监视,报告,机器学习和许多其他功能。 ES7.0+之后,默认情况下,当安装Elasticsearch时,会安装X-Pack,无需单独再安装。具体查看官方文档相关配置项也在官方文档中, ES版本是8.2.3,环境windows server2012 R2
郑小超.
2023/04/08
1.5K0
ES 安全认证模块之XPack
TypeScript 导出 CommonJS 和 ES 模块
要导出到 TypeScript 中的 CommonJS 和 ES 模块,请在导出中设置默认属性:
小弟调调
2023/08/28
7080
TypeScript 导出 CommonJS 和 ES 模块

相似问题

es6模块到带有类型记录的公共es6的节点协调标志下

12

使用带有nodejs和mocha的es模块

11

ES6 -带有getter的导出模块

23

带有ES6模块的命名空间

13

导入节点js时加载"ES模块“的问题

12
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档