在 Spring Cloud 或普通的 Spring Boot 应用中,配置多个 Kafka 实例(即多个 KafkaTemplate
和 ConcurrentKafkaListenerContainerFactory
)是一个常见的需求,例如需要连接到不同环境(开发、测试、生产)或不同业务域的 Kafka 集群。
以下是使用 Spring Boot + Spring Kafka 配置多个 Kafka 实例的完整示例。
G.CWQ.INFO23丨M.OEJ.INFO13丨G.KQO.INFO10丨K.CWQ.INFO45
P.OEJ.INFO47丨A.KQO.INFO67丨B.CWQ.INFO80丨R.XCI.INFO85
K.KQO.INFO20丨I.XCI.INFO62丨N.KXL.INFO15丨D.KQO.INFO50
C.OEJ.INFO80丨C.KQO.INFO59丨P.XCI.INFO64丨T.XCI.INFO19
Q.KQO.INFO37丨F.CWQ.INFO10丨R.KQO.INFO82丨Q.KXL.INFO69
L.CWQ.INFO17丨H.XCI.INFO22丨T.KXL.INFO42丨R.KXL.INFO15
K.OEJ.INFO24丨J.CWQ.INFO65丨G.XCI.INFO39丨O.KXL.INFO76
B.KQO.INFO99丨J.KXL.INFO39丨O.CWQ.INFO12丨C.CWQ.INFO90
Q.OEJ.INFO86丨J.KQO.INFO64丨N.KQO.INFO25丨L.XCI.INFO87
S.KQO.INFO18丨R.OEJ.INFO42丨G.KXL.INFO90丨B.OEJ.INFO34
M.CWQ.INFO30丨P.CWQ.INFO64丨J.XCI.INFO68丨R.CWQ.INFO89
假设我们需要连接两个 Kafka 集群:
bootstrap-servers=192.168.1.10:9092
bootstrap-servers=192.168.1.11:9092
pom.xml
)<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
application.yml
)# Kafka 集群 A (订单系统)
kafka:
order:
bootstrap-servers: 192.168.1.10:9092
consumer:
group-id: order-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
user:
bootstrap-servers: 192.168.1.11:9092
consumer:
group-id: user-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
创建一个配置类来定义两个 Kafka 实例的 ProducerFactory
、ConsumerFactory
、KafkaTemplate
和监听器工厂。
package com.example.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
// ==================== Kafka 集群 A (订单系统) ====================
@Bean("orderProducerFactory")
@ConfigurationProperties(prefix = "kafka.order.producer")
public Map<String, Object> orderProducerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
return props;
}
@Bean("orderProducerFactory")
public ProducerFactory<String, Object> orderProducerFactory() {
return new DefaultKafkaProducerFactory<>(orderProducerConfigs());
}
@Bean("orderKafkaTemplate")
public KafkaTemplate<String, Object> orderKafkaTemplate() {
return new KafkaTemplate<>(orderProducerFactory());
}
@Bean("orderConsumerFactory")
@ConfigurationProperties(prefix = "kafka.order.consumer")
public Map<String, Object> orderConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean("orderConsumerFactory")
public ConsumerFactory<String, Object> orderConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(orderConsumerConfigs());
}
@Bean("orderKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderConsumerFactory());
return factory;
}
// ==================== Kafka 集群 B (用户系统) ====================
@Bean("userProducerFactory")
@ConfigurationProperties(prefix = "kafka.user.producer")
public Map<String, Object> userProducerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092");
return props;
}
@Bean("userProducerFactory")
public ProducerFactory<String, Object> userProducerFactory() {
return new DefaultKafkaProducerFactory<>(userProducerConfigs());
}
@Bean("userKafkaTemplate")
public KafkaTemplate<String, Object> userKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
@Bean("userConsumerFactory")
@ConfigurationProperties(prefix = "kafka.user.consumer")
public Map<String, Object> userConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean("userConsumerFactory")
public ConsumerFactory<String, Object> userConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(userConsumerConfigs());
}
@Bean("userKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> userKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
}
@Service
public class MessageService {
@Autowired
@Qualifier("orderKafkaTemplate") // 注入订单 KafkaTemplate
private KafkaTemplate<String, Object> orderKafkaTemplate;
@Autowired
@Qualifier("userKafkaTemplate") // 注入用户 KafkaTemplate
private KafkaTemplate<String, Object> userKafkaTemplate;
public void sendOrderMessage(String topic, Object message) {
orderKafkaTemplate.send(topic, message);
}
public void sendUserMessage(String topic, Object message) {
userKafkaTemplate.send(topic, message);
}
}
@Component
public class KafkaListeners {
// 使用订单系统的监听器工厂
@KafkaListener(topics = "order-topic", groupId = "order-group",
containerFactory = "orderKafkaListenerContainerFactory")
public void listenOrder(String message) {
System.out.println("收到订单消息: " + message);
}
// 使用用户系统的监听器工厂
@KafkaListener(topics = "user-topic", groupId = "user-group",
containerFactory = "userKafkaListenerContainerFactory")
public void listenUser(String message) {
System.out.println("收到用户消息: " + message);
}
}
组件 | 订单系统 (A) | 用户系统 (B) |
---|---|---|
ProducerFactory | @Bean("orderProducerFactory") | @Bean("userProducerFactory") |
KafkaTemplate | @Bean("orderKafkaTemplate") | @Bean("userKafkaTemplate") |
ConsumerFactory | @Bean("orderConsumerFactory") | @Bean("userConsumerFactory") |
Listener Factory | @Bean("orderKafkaListenerContainerFactory") | @Bean("userKafkaListenerContainerFactory") |
使用方式 | @Qualifier("xxx") + containerFactory = "xxx" | 同上 |
@Qualifier
必须使用:当有多个同类型 Bean 时,必须用 @Qualifier
指定注入哪一个。containerFactory
指定监听器工厂:@KafkaListener
必须通过 containerFactory
属性指定使用哪个监听器工厂。bindings
配置多个 Binder
来实现类似功能,但原生 Spring Kafka 配置更直观。这个方案稳定可靠,广泛应用于生产环境。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。