前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >微服务同时接入多个Kafka

微服务同时接入多个Kafka

作者头像
阿提说说
发布2022-11-18 16:43:43
发布2022-11-18 16:43:43
1.3K00
代码可运行
举报
文章被收录于专栏:Java技术进阶Java技术进阶
运行总次数:0
代码可运行

最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。

文章目录

准备工作

  • 自己搭建一个Kafka 从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.xhttps://kafka.apache.org/downloads
  • 解压安装 进入bin目录,执行如下命令,按照如下顺序启动 Linux
代码语言:javascript
代码运行次数:0
运行
复制
# 配置文件选择自己对应的目录
zookeeper-server-start.sh ../config/zookeeper.properties

Windows

代码语言:javascript
代码运行次数:0
运行
复制
windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServer

Linux

代码语言:javascript
代码运行次数:0
运行
复制
kafka-server-start.sh ../config/server.properties

Windows

代码语言:javascript
代码运行次数:0
运行
复制
windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka

如下是最小化配置Kafka

pom.xml 引入依赖

代码语言:javascript
代码运行次数:0
运行
复制
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

application.properties

代码语言:javascript
代码运行次数:0
运行
复制
server.port=8090
spring.application.name=single-kafka-server

#kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
#消费者分组,配置后,自动创建
spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

代码语言:javascript
代码运行次数:0
运行
复制
@Slf4j
@Component
@EnableScheduling
public class KafkaProducer {

    @Resource
    private KafkaTemplate kafkaTemplate;

    private void sendTest() {
    	//topic 会自动创建
        kafkaTemplate.send("topic1", "hello kafka");
    }

    @Scheduled(fixedRate = 1000 * 10)
    public void testKafka() {
        log.info("send message...");
        sendTest();
    }
}

KafkaConsumer 消费者

代码语言:javascript
代码运行次数:0
运行
复制
@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"topic1"})
    public void processMessage(String spuId) {
        log.warn("process spuId ={}", spuId);
    }

}

运行效果:

多Kafka配置

配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖

pom.xml

代码语言:javascript
代码运行次数:0
运行
复制
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

application.properties

代码语言:javascript
代码运行次数:0
运行
复制
server.port=8090
spring.application.name=kafka-server

#kafka1
#服务器地址
spring.kafka.one.bootstrap-servers=localhost:9092
spring.kafka.one.consumer.group-id=default_group


#kafka2
spring.kafka.two.bootstrap-servers=localhost:9092
spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称

KafkaOneConfig

代码语言:javascript
代码运行次数:0
运行
复制
@Configuration
public class KafkaOneConfig {

    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;

    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "kafkaOneContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息

kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,

producerFactory 生产者工厂

consumerFactory 消费者工厂

producerConfigs 生产者配置

consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个Kafka

KafkaTwoConfig

代码语言:javascript
代码运行次数:0
运行
复制
@Configuration
public class KafkaTwoConfig {

    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;

    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "kafkaTwoContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

创建一个测试的消费者,注意配置不同的监听容器containerFactory

KafkaConsumer

代码语言:javascript
代码运行次数:0
运行
复制
@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")
    public void oneProcessItemcenterSpuMessage(String spuId) {
        log.warn("one process spuId ={}", spuId);
    }

    @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")
    public void twoProcessItemcenterSpuMessage(String spuId) {
        log.warn("two process spuId ={}", spuId);
    }
}

创建一个测试的生产者,定时往两个topic中发送消息

KafkaProducer

代码语言:javascript
代码运行次数:0
运行
复制
@Slf4j
@Component
public class KafkaProducer {

    @Resource
    private KafkaTemplate kafkaOneTemplate;
    @Resource
    private KafkaTemplate kafkaTwoTemplate;

    private void sendTest() {
        kafkaOneTemplate.send("topic1", "hello kafka one");
        kafkaTwoTemplate.send("topic2", "hello kafka two");
    }

    @Scheduled(fixedRate = 1000 * 10)
    public void testKafka() {
        log.info("send message...");
        sendTest();
    }
}

最后运行效果:

其他kafka文章: 【从面试题看源码】-看完Kafka性能优化-让你吊打面试官

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 准备工作
  • 最小化配置Kafka
  • 多Kafka配置
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档