我正在尝试用spring boot对kafka的使用进行单元测试,但在输入通道上遇到了问题。下面是我正在做的事情的摘录。
public interface MyCustomStreamBinding{
@Input
SubscribableChannel consumeChannel();
@Output
MessageChannel produceChannel();
}
@EnableBinding(value = { Source.class, MyCustomStreamBinding.class })
public class StreamConfiguration {
...
}
@Service
public class MyService {
private final MyCustomStreamBinding streamBinding;
public MyService(MyCustomStreamBinding streamBinding) {
this.streamBinding = streamBinding;
}
public void sendMessage() {
streamBinding.produceChannel().send(new SomeObject);
}
@StreamListener("consumeChannel")
public void consumeChannel(SomeObject payload){
// do processing of payload
}
}
然后在我的测试用例中
@SpringBootTest(classes = {MyApp.class})
class MyServiceTest {
private MyService myService;
@Autowired
private MyCustomStreamBinding streamBinding;
@Autowired
private MessageCollector messageCollector;
@BeforeEach
public void setup(){
myService = new MyService(streamBinding);
}
@Test
public void TestMessaging(){
myService.sendMessage();
Message<?> m = messageCollector.forChannel(streamBinding.produceChannel()).poll();
assertThat(m.getPayload(), equalTo(new SomeObject()));
}
}
如何测试consumeChannel以及它是否实际执行了预期的处理?
发布于 2021-04-05 12:13:25
这里我有一个示例,它由两个监听器组成,用于消费数据和产生数据。您可以使用@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"})
与@SpringBootTest
一起禁用web服务。然后使用JUnit5 @ExtendWith(SpringExtension.class)
,使用嵌入式kafka集群@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)
。
以这个简单的服务为例,它接收侦听器process-in-0
上的数据,将其转换为大写,并在侦听器process-out-0
上发出新数据。
public interface KafkaListenerBinding {
@Input("process-in-0")
KStream<String, String> inputStream();
@Output("process-out-0")
KStream<String, String> outStream();
}
@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {
@StreamListener("process-in-0")
@SendTo("process-out-0")
public KStream<String, String> transformToUpperCase(KStream<String, String> input) {
input.peek((k, v) -> log.info("Received Input: {}", v));
return input.mapValues(v -> v.toUpperCase());
}
}
要测试它,请使用嵌入式kafka集群。请注意,实际的kafka claster不一定要可用。然后,您可以使用属性brokers: ${spring.embedded.kafka.brokers}
。
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"})
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)
@TestPropertySource(properties = {
"spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.admin.properties.bootstrap.servers=${spring.embedded.kafka.brokers}"
})
public class KafkaListenerServiceTest {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@SpyBean
KafkaListenerService kafkaListenerServiceSpy;
private Consumer<String, String> consumer;
@BeforeEach
public void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("group1", "true", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
embeddedKafkaBroker.consumeFromAllEmbeddedTopics(consumer);
}
@AfterEach
public void tearDown() {
consumer.close();
}
@Test
public void SimpleProcessorApplicationTest() throws ExecutionException, InterruptedException {
Set<String> actualResultSet = new HashSet<>();
Set<String> expectedResultSet = new HashSet<>();
expectedResultSet.add("HELLO1");
expectedResultSet.add("HELLO2");
Map<String, Object> senderProps = producerProps(embeddedKafkaBroker);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("input-topic");
template.sendDefault("hello1").get();
verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));
template.sendDefault("hello2").get();
verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));
int receivedAll = 0;
while (receivedAll < 2) {
ConsumerRecords<String, String> cr = getRecords(consumer);
receivedAll = receivedAll + cr.count();
cr.iterator().forEachRemaining(r -> {
System.out.println("result: " + r.value());
actualResultSet.add(r.value());
});
}
assertThat(actualResultSet.equals(expectedResultSet)).isTrue();
} finally {
pf.destroy();
}
}
}
并如下配置您的application.yml
文件,并确保不使用schema.registry.url: not-used
启用模式注册表
spring:
kafka:
consumer:
group-id: group-01
cloud:
stream:
bindings:
process-in-0:
destination: input-topic
process-out-0:
destination: output-topic
notification-input-channel:
destination: pos-topic
kafka:
streams:
binder:
brokers: ${spring.embedded.kafka.brokers}
configuration:
schema.registry.url: not-used
commit.interval.ms: 100
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
process-in-0:
consumer:
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
process-out-0:
producer:
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
---
https://stackoverflow.com/questions/60876315
复制相似问题