Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >【spring-kafka】@KafkaListener详解与使用

【spring-kafka】@KafkaListener详解与使用

作者头像
石臻臻的杂货铺[同名公众号]
发布于 2021-07-14 02:33:05
发布于 2021-07-14 02:33:05
22.1K00
代码可运行
举报
文章被收录于专栏:kafka专栏kafka专栏
运行总次数:0
代码可运行

说明

  • 从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;
  • 可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。 比如:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3


@KafkaListener详解

id 监听器的id

①. 消费者线程命名规则

填写:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的监听器ID不能重复

否则会报错

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.会覆盖消费者工厂的消费组GroupId

假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO 正常情况它是该容器中的默认消费组 但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"}) 那么当前消费者的消费组就是consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代码中最终这个消费者的消费组GroupId是 “groupId-test”

该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id

如何获取消费者 group.id

在监听器中调用KafkaUtils.getConsumerGroupId()可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个 topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)

topicPartitions 显式分区分配

可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现KafkaListenerErrorHandler; 然后做一些异常处理;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer) {
    	//do someting
        return null;
    }
}

调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

例如我写一个 批量消费的工厂类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 监听器工厂 批量消费
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字

concurrency并发数

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看 属性concurrency的作用及配置(RoundRobinAssignor 、RangeAssignor)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 监听器工厂 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

properties 配置其他属性

kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ; 同名的都可以修改掉;

用法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 获取所有注册的监听器
        registry.getAllListenerContainers();

设置入参验证器

当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

使用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

TODO…

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/01/07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Apache Kafka-消费端消费重试和死信队列
Spring-Kafka 提供消费重试的机制。当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。
小小工匠
2021/08/17
13.2K0
Apache Kafka-事务消息的支持与实现(本地事务)
Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)。
小小工匠
2021/08/17
2.2K0
Apache Kafka-消费端_批量消费消息的核心参数及功能实现
https://kafka.apache.org/24/documentation.html#consumerconfigs
小小工匠
2021/08/17
2.9K0
Apache Kafka-生产者_批量发送消息的核心参数及功能实现
kafka中有个 micro batch 的概念 ,为了提高Producer 发送的性能。
小小工匠
2021/08/17
4.2K0
Apache Kafka-auto.offset.reset参数(earliest、latest、none)含义说明
https://kafka.apache.org/24/documentation.html
小小工匠
2021/08/17
2.3K0
Apache Kafka-SpringBoot整合Kafka发送复杂对象
我们先对 Kafka-Spring 做个快速入门,实现 Producer发送消息 ,同时Consumer 消费消息。
小小工匠
2021/08/17
2.2K0
Apache Kafka-消费端_顺序消费的实现
一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。
小小工匠
2021/08/17
1.2K0
Apache Kafka-通过设置Consumer Group实现广播模式
传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)
小小工匠
2021/08/17
9.1K1
聊聊如何实现一个带幂等模板的Kafka消费者
不知道大家有没有这样的体验,你跟你团队的成员,宣导一些开发时注意事项,比如在使用消息队列时,在消费端处理消息时,需根据业务场景,考虑一下幂等。后面走查代码的时,会发现一些资浅的开发,在需要幂等判断的场景的情况下,仍然没做幂等判断。既然宣导无效,就干脆实现一个带幂等模板的消费者,然后开发基于这个模板进行消费端业务处理。本文就以spring-kafka举例,聊聊如何实现一个带幂等模板的kafka消费者
lyb-geek
2022/05/24
1.2K0
聊聊如何实现一个带幂等模板的Kafka消费者
【spring-kafka】@KafkaListener详解与使用
属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3
石臻臻的杂货铺[同名公众号]
2021/07/14
22.1K1
Spring Boot整合kafka
此处简单记录一下 SpringBoot 和 Kafka 的整合。 先初始化一个SpringBoot工程
闻说社
2025/04/17
950
Spring Boot整合kafka
Apache Kafka-消息丢失分析 及 ACK机制探究
消息丢失得分两种情况 : 生产者 和 消费者 都有可能因处理不当导致消息丢失的情况
小小工匠
2021/08/17
1.9K0
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,
鱼找水需要时间
2023/02/16
3.4K0
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
Spring Boot Kafka 生产者/消费者示例
Spring Boot 是最流行和最常用的 Java 编程语言框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作一个可用于生产的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。下面列出了 Spring boot 的一些主要特性。
用户1418987
2023/10/26
9870
Spring Boot Kafka 生产者/消费者示例
Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?
https://github.com/Snailclimb/springboot-kafka
Guide哥
2020/05/07
1.9K0
Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
KafkaBootstrapConfiguration的主要功能是创建两个bean
小小工匠
2021/08/17
8260
Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。
小小工匠
2023/06/04
4.6K0
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
Kafka及Spring Cloud Stream
下载kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
HUC思梦
2020/09/03
1.2K0
Apache Kafka-生产消费基础篇
POM 依赖 版本请同使用的kafka服务端的版本保持一致 <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kaf
小小工匠
2021/08/17
2940
聊聊KafkaListener的实现机制
org/springframework/kafka/annotation/KafkaListener.java
code4it
2023/10/22
8150
推荐阅读
相关推荐
Apache Kafka-消费端消费重试和死信队列
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验