Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >单元测试Spring Stream Kafka

单元测试Spring Stream Kafka
EN

Stack Overflow用户
提问于 2020-03-26 21:21:42
回答 1查看 82关注 0票数 1

我正在尝试用spring boot对kafka的使用进行单元测试,但在输入通道上遇到了问题。下面是我正在做的事情的摘录。

代码语言:javascript
运行
AI代码解释
复制
public interface MyCustomStreamBinding{
   @Input
   SubscribableChannel consumeChannel();

   @Output
   MessageChannel produceChannel();
}

@EnableBinding(value = { Source.class, MyCustomStreamBinding.class })
public class StreamConfiguration {
...
}

@Service
public class MyService {

  private final MyCustomStreamBinding streamBinding;
  public MyService(MyCustomStreamBinding streamBinding) {
    this.streamBinding = streamBinding;
  }

  public void sendMessage() {
    streamBinding.produceChannel().send(new SomeObject);
  }

  @StreamListener("consumeChannel")
  public void consumeChannel(SomeObject payload){
    // do processing of payload
  }
}

然后在我的测试用例中

代码语言:javascript
运行
AI代码解释
复制
@SpringBootTest(classes = {MyApp.class})
class MyServiceTest {
  private MyService myService;

  @Autowired
  private MyCustomStreamBinding streamBinding;
  @Autowired
  private MessageCollector messageCollector;

  @BeforeEach
  public void setup(){
    myService = new MyService(streamBinding);
  }

  @Test
  public void TestMessaging(){
   myService.sendMessage();

   Message<?> m = messageCollector.forChannel(streamBinding.produceChannel()).poll();
   assertThat(m.getPayload(), equalTo(new SomeObject()));
  }
}

如何测试consumeChannel以及它是否实际执行了预期的处理?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-05 12:13:25

这里我有一个示例,它由两个监听器组成,用于消费数据和产生数据。您可以使用@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"})@SpringBootTest一起禁用web服务。然后使用JUnit5 @ExtendWith(SpringExtension.class),使用嵌入式kafka集群@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)

以这个简单的服务为例,它接收侦听器process-in-0上的数据,将其转换为大写,并在侦听器process-out-0上发出新数据。

代码语言:javascript
运行
AI代码解释
复制
public interface KafkaListenerBinding {
    @Input("process-in-0")
    KStream<String, String> inputStream();

    @Output("process-out-0")
    KStream<String, String> outStream();
}

@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {

    @StreamListener("process-in-0")
    @SendTo("process-out-0")
    public KStream<String, String> transformToUpperCase(KStream<String, String> input) {
        input.peek((k, v) -> log.info("Received Input: {}", v));
        return input.mapValues(v -> v.toUpperCase());
    }
}

要测试它,请使用嵌入式kafka集群。请注意,实际的kafka claster不一定要可用。然后,您可以使用属性brokers: ${spring.embedded.kafka.brokers}

代码语言:javascript
运行
AI代码解释
复制
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"})
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)
@TestPropertySource(properties = {
        "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.admin.properties.bootstrap.servers=${spring.embedded.kafka.brokers}"
})
public class KafkaListenerServiceTest {

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;
    @SpyBean
    KafkaListenerService kafkaListenerServiceSpy;
    private Consumer<String, String> consumer;

    @BeforeEach
    public void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("group1", "true", embeddedKafkaBroker));
        consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
        embeddedKafkaBroker.consumeFromAllEmbeddedTopics(consumer);
    }

    @AfterEach
    public void tearDown() {
        consumer.close();
    }

    @Test
    public void SimpleProcessorApplicationTest() throws ExecutionException, InterruptedException {
        Set<String> actualResultSet = new HashSet<>();
        Set<String> expectedResultSet = new HashSet<>();
        expectedResultSet.add("HELLO1");
        expectedResultSet.add("HELLO2");

        Map<String, Object> senderProps = producerProps(embeddedKafkaBroker);
        DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        try {
            KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
            template.setDefaultTopic("input-topic");

            template.sendDefault("hello1").get();
            verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));

            template.sendDefault("hello2").get();
            verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));

            int receivedAll = 0;
            while (receivedAll < 2) {
                ConsumerRecords<String, String> cr = getRecords(consumer);
                receivedAll = receivedAll + cr.count();
                cr.iterator().forEachRemaining(r -> {
                    System.out.println("result: " + r.value());
                    actualResultSet.add(r.value());
                });
            }

            assertThat(actualResultSet.equals(expectedResultSet)).isTrue();
        } finally {
            pf.destroy();
        }
    }
}

并如下配置您的application.yml文件,并确保不使用schema.registry.url: not-used启用模式注册表

代码语言:javascript
运行
AI代码解释
复制
spring:
  kafka:
    consumer:
      group-id: group-01
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: input-topic
        process-out-0:
          destination: output-topic
        notification-input-channel:
          destination: pos-topic
      kafka:
        streams:
          binder:
            brokers: ${spring.embedded.kafka.brokers}
            configuration:
              schema.registry.url: not-used
              commit.interval.ms: 100
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          bindings:
            process-in-0:
              consumer:
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
            process-out-0:
              producer:
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
---
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60876315

复制
相关文章
Spring Cloud Stream整合Kafka
KafkaProperties-> Consumer->valueDeserializer
用户1215919
2021/12/28
1.3K0
Kafka及Spring Cloud Stream
下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
HUC思梦
2020/09/03
1.2K0
Spring Cloud Stream与Kafka集成
Spring Cloud Stream是一个用于构建消息驱动的微服务的框架,它为Spring Boot应用程序提供了与消息代理集成的声明式模型。在本文中,我们将探讨如何使用Spring Cloud Stream与Kafka集成,以及如何构建一个使用Kafka作为消息代理的Spring Boot应用程序。
堕落飞鸟
2023/04/12
1.5K0
Spring Cloud Stream与Kafka集成示例
下面是一个完整的示例,它使用Spring Cloud Stream和Kafka来创建一个简单的消息处理器和发布器:
堕落飞鸟
2023/04/12
1.2K0
spring-cloud-stream-binder-kafka属性配置
本文简单介绍下spring-cloud-stream-binder-kafka的一些属性配置。
code4it
2018/09/17
3.9K0
Kafka设计解析(七)- Kafka Stream
Kafka Stream背景 Kafka Stream是什么 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。 Kafka Stream的特点如下: Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署 除了Kafka外,无任何外部依赖 充分利用Kafka分区机制实现水平扩展和顺序性保证 通过可容错的state store实
Jason Guo
2018/06/20
2.3K2
kafka stream简要分析
kafka历史背景 Kafka是2010年Kafka是Linkedin于2010年12月份开源的消息系统,我接触的不算早,大概14年的时候,可以看看我们14年写的文章《高速总线kafka介绍》。 消息总线一直是作IT系统集成的核心概念,IBM/oracle等传统厂商都有相关中间件产品。传统消息中间件解决是消息的传输,一般支持AMQP协议来实现,如RabbitMQ。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求
大数据和云计算技术
2018/03/08
1.3K0
kafka stream简要分析
kafka stream word count实例
kafka呢其实正道不是消息队列,本质是日志存储系统,而stream processing是其最近大力推广的特性,本文简单介绍下word count的实例。
code4it
2018/09/17
1K0
Kafka核心API——Stream API
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。
端碗吹水
2020/09/23
3.7K0
Kafka核心API——Stream API
kafka stream errorlog报警实例
log4j-core-2.7-sources.jar!/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
code4it
2018/09/17
6460
Kafka Stream(KStream) vs Apache Flink
腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。
吴云涛
2021/11/28
4.9K0
Kafka Stream(KStream) vs Apache Flink
Spark Stream对接kafka 源码分析
本文会讲解Spark Stream是如何与Kafka进行对接的,包括DirectInputStream和KafkaRDD是如何与KafkaConsumer交互的
平凡的学生族
2020/06/29
9760
单元测试(Spring)
YGingko
2017/12/28
4.7K0
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
在这个博客系列的第1部分之后,Apache Kafka的Spring——第1部分:错误处理、消息转换和事务支持,在这里的第2部分中,我们将关注另一个增强开发者在Kafka上构建流应用程序时体验的项目:Spring Cloud Stream。
架构师研究会
2019/10/23
2.6K0
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
Spring单元测试
单元测试对开发来说是一种基本素养。Java这方面的工业标准是使用JUnit。在使用了Spring框架及其衍生的相关框架后,会有不同程度的变化。
李鸿坤
2020/11/03
1.2K0
​玩转Kafka—Spring整合Kafka
参考文章:https://www.cnblogs.com/angelyan/p/10800739.html
闫同学
2022/10/31
8660
​玩转Kafka—Spring整合Kafka
Spring Cloud Task 集成Spring Cloud Stream
pring Cloud Task和Spring Cloud Stream都是Spring Cloud的组件,它们都提供了处理消息的功能。
堕落飞鸟
2023/04/17
7600
Spring cloud stream【消息分组】
  上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决了!
用户4919348
2019/07/03
1.1K0
Spring cloud stream【消息分组】
Spring Cloud 系列之 Spring Cloud Stream
比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。
古时的风筝
2019/09/29
1.7K0
Spring Cloud 系列之 Spring Cloud Stream
Spring Cloud Stream和 Kafka 的那点事,居然还有人没搞清楚?
Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
Java程序猿阿谷
2020/07/29
1.9K0
Spring Cloud Stream和 Kafka 的那点事,居然还有人没搞清楚?

相似问题

Spring Kafka Stream未编写

129

Spring cloud stream with bind kafka

15

Spring cloud stream / Kafka异常

2138

如何使用Kafka DSL对Spring Cloud Stream进行单元测试

168

如何使用Kafka Streams对Spring Cloud Stream进行单元测试

114
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档