在Spring Kafka中,可以通过重用代码来设置多个监听器。下面是一种常见的方法:
下面是一个示例代码:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
DefaultKafkaListenerContainerFactory<String, String> factory = new DefaultKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry();
}
@Bean
public KafkaListenerEndpoint endpoint1() {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setTopics("topic1");
endpoint.setGroupId("group1");
endpoint.setMessageListener(messageListener1());
return endpoint;
}
@Bean
public KafkaListenerEndpoint endpoint2() {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setTopics("topic2");
endpoint.setGroupId("group2");
endpoint.setMessageListener(messageListener2());
return endpoint;
}
@Bean
public MessageListener<String, String> messageListener1() {
return new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
// 处理消息的逻辑
}
};
}
@Bean
public MessageListener<String, String> messageListener2() {
return new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
// 处理消息的逻辑
}
};
}
@PostConstruct
public void registerListeners() {
KafkaListenerEndpointRegistry registry = kafkaListenerEndpointRegistry();
registry.registerListener(endpoint1());
registry.registerListener(endpoint2());
}
}
在上述示例中,我们创建了两个KafkaListenerEndpoint bean,分别用于监听"topic1"和"topic2"两个主题。每个KafkaListenerEndpoint bean都设置了不同的groupId和消息处理逻辑。
这样,我们就可以通过重用代码来设置多个Kafka监听器。每个监听器可以监听不同的主题,并且具有不同的消息处理逻辑。
腾讯云相关产品和产品介绍链接地址:
领取专属 10元无门槛券
手把手带您无忧上云