首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spring Cloud中配置多个 Kafka 实例的示例

Spring Cloud中配置多个 Kafka 实例的示例

原创
作者头像
技术文章分析
发布2025-09-22 13:26:06
发布2025-09-22 13:26:06
15200
代码可运行
举报
文章被收录于专栏:技术技术
运行总次数:0
代码可运行

在 Spring Cloud 或普通的 Spring Boot 应用中,配置多个 Kafka 实例(即多个 KafkaTemplateConcurrentKafkaListenerContainerFactory)是一个常见的需求,例如需要连接到不同环境(开发、测试、生产)或不同业务域的 Kafka 集群。

以下是使用 Spring Boot + Spring Kafka 配置多个 Kafka 实例的完整示例。

代码语言:javascript
代码运行次数:0
运行
复制
G.CWQ.INFO23丨M.OEJ.INFO13丨G.KQO.INFO10丨K.CWQ.INFO45
P.OEJ.INFO47丨A.KQO.INFO67丨B.CWQ.INFO80丨R.XCI.INFO85
K.KQO.INFO20丨I.XCI.INFO62丨N.KXL.INFO15丨D.KQO.INFO50
C.OEJ.INFO80丨C.KQO.INFO59丨P.XCI.INFO64丨T.XCI.INFO19
Q.KQO.INFO37丨F.CWQ.INFO10丨R.KQO.INFO82丨Q.KXL.INFO69
L.CWQ.INFO17丨H.XCI.INFO22丨T.KXL.INFO42丨R.KXL.INFO15
K.OEJ.INFO24丨J.CWQ.INFO65丨G.XCI.INFO39丨O.KXL.INFO76
B.KQO.INFO99丨J.KXL.INFO39丨O.CWQ.INFO12丨C.CWQ.INFO90
Q.OEJ.INFO86丨J.KQO.INFO64丨N.KQO.INFO25丨L.XCI.INFO87
S.KQO.INFO18丨R.OEJ.INFO42丨G.KXL.INFO90丨B.OEJ.INFO34
M.CWQ.INFO30丨P.CWQ.INFO64丨J.XCI.INFO68丨R.CWQ.INFO89

🧩 场景

假设我们需要连接两个 Kafka 集群:

  1. Kafka 集群 A:用于订单系统,bootstrap-servers=192.168.1.10:9092
  2. Kafka 集群 B:用于用户系统,bootstrap-servers=192.168.1.11:9092

✅ 步骤 1:添加依赖(pom.xml

代码语言:javascript
代码运行次数:0
运行
复制
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

✅ 步骤 2:配置文件(application.yml

代码语言:javascript
代码运行次数:0
运行
复制
# Kafka 集群 A (订单系统)
kafka:
  order:
    bootstrap-servers: 192.168.1.10:9092
    consumer:
      group-id: order-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  user:
    bootstrap-servers: 192.168.1.11:9092
    consumer:
      group-id: user-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

✅ 步骤 3:配置类(Java Config)

创建一个配置类来定义两个 Kafka 实例的 ProducerFactoryConsumerFactoryKafkaTemplate 和监听器工厂。

代码语言:javascript
代码运行次数:0
运行
复制
package com.example.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    // ==================== Kafka 集群 A (订单系统) ====================

    @Bean("orderProducerFactory")
    @ConfigurationProperties(prefix = "kafka.order.producer")
    public Map<String, Object> orderProducerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        return props;
    }

    @Bean("orderProducerFactory")
    public ProducerFactory<String, Object> orderProducerFactory() {
        return new DefaultKafkaProducerFactory<>(orderProducerConfigs());
    }

    @Bean("orderKafkaTemplate")
    public KafkaTemplate<String, Object> orderKafkaTemplate() {
        return new KafkaTemplate<>(orderProducerFactory());
    }

    @Bean("orderConsumerFactory")
    @ConfigurationProperties(prefix = "kafka.order.consumer")
    public Map<String, Object> orderConsumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean("orderConsumerFactory")
    public ConsumerFactory<String, Object> orderConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(orderConsumerConfigs());
    }

    @Bean("orderKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, Object> orderKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderConsumerFactory());
        return factory;
    }


    // ==================== Kafka 集群 B (用户系统) ====================

    @Bean("userProducerFactory")
    @ConfigurationProperties(prefix = "kafka.user.producer")
    public Map<String, Object> userProducerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092");
        return props;
    }

    @Bean("userProducerFactory")
    public ProducerFactory<String, Object> userProducerFactory() {
        return new DefaultKafkaProducerFactory<>(userProducerConfigs());
    }

    @Bean("userKafkaTemplate")
    public KafkaTemplate<String, Object> userKafkaTemplate() {
        return new KafkaTemplate<>(userProducerFactory());
    }

    @Bean("userConsumerFactory")
    @ConfigurationProperties(prefix = "kafka.user.consumer")
    public Map<String, Object> userConsumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.11:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean("userConsumerFactory")
    public ConsumerFactory<String, Object> userConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(userConsumerConfigs());
    }

    @Bean("userKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, Object> userKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }
}

✅ 步骤 4:使用示例

4.1 发送消息到不同 Kafka 集群

代码语言:javascript
代码运行次数:0
运行
复制
@Service
public class MessageService {

    @Autowired
    @Qualifier("orderKafkaTemplate")  // 注入订单 KafkaTemplate
    private KafkaTemplate<String, Object> orderKafkaTemplate;

    @Autowired
    @Qualifier("userKafkaTemplate")   // 注入用户 KafkaTemplate
    private KafkaTemplate<String, Object> userKafkaTemplate;

    public void sendOrderMessage(String topic, Object message) {
        orderKafkaTemplate.send(topic, message);
    }

    public void sendUserMessage(String topic, Object message) {
        userKafkaTemplate.send(topic, message);
    }
}

4.2 监听不同 Kafka 集群的消息

代码语言:javascript
代码运行次数:0
运行
复制
@Component
public class KafkaListeners {

    // 使用订单系统的监听器工厂
    @KafkaListener(topics = "order-topic", groupId = "order-group",
                   containerFactory = "orderKafkaListenerContainerFactory")
    public void listenOrder(String message) {
        System.out.println("收到订单消息: " + message);
    }

    // 使用用户系统的监听器工厂
    @KafkaListener(topics = "user-topic", groupId = "user-group",
                   containerFactory = "userKafkaListenerContainerFactory")
    public void listenUser(String message) {
        System.out.println("收到用户消息: " + message);
    }
}

✅ 总结

组件

订单系统 (A)

用户系统 (B)

ProducerFactory

@Bean("orderProducerFactory")

@Bean("userProducerFactory")

KafkaTemplate

@Bean("orderKafkaTemplate")

@Bean("userKafkaTemplate")

ConsumerFactory

@Bean("orderConsumerFactory")

@Bean("userConsumerFactory")

Listener Factory

@Bean("orderKafkaListenerContainerFactory")

@Bean("userKafkaListenerContainerFactory")

使用方式

@Qualifier("xxx") + containerFactory = "xxx"

同上


🔁 注意事项

  1. @Qualifier 必须使用:当有多个同类型 Bean 时,必须用 @Qualifier 指定注入哪一个。
  2. containerFactory 指定监听器工厂@KafkaListener 必须通过 containerFactory 属性指定使用哪个监听器工厂。
  3. 序列化器/反序列化器:根据实际消息类型配置(如 JSON、Avro)。
  4. Spring Cloud Stream:如果您使用 Spring Cloud Stream,也可以通过 bindings 配置多个 Binder 来实现类似功能,但原生 Spring Kafka 配置更直观。

这个方案稳定可靠,广泛应用于生产环境。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 🧩 场景
  • ✅ 步骤 1:添加依赖(pom.xml)
  • ✅ 步骤 2:配置文件(application.yml)
  • ✅ 步骤 3:配置类(Java Config)
  • ✅ 步骤 4:使用示例
    • 4.1 发送消息到不同 Kafka 集群
    • 4.2 监听不同 Kafka 集群的消息
  • ✅ 总结
  • 🔁 注意事项
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档