Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >【spring-kafka】@KafkaListener详解与使用

【spring-kafka】@KafkaListener详解与使用

作者头像
全栈程序员站长
发布于 2022-11-04 09:12:32
发布于 2022-11-04 09:12:32
2K00
代码可运行
举报
运行总次数:0
代码可运行

Kafka高质量专栏请看 石臻臻的杂货铺的Kafka专栏

说明

  • 从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;
  • 可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。 比如:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3


@KafkaListener详解

id 监听器的id

①. 消费者线程命名规则

填写:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的监听器ID不能重复

否则会报错

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.会覆盖消费者工厂的消费组GroupId

假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO 正常情况它是该容器中的默认消费组 但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"}) 那么当前消费者的消费组就是consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代码中最终这个消费者的消费组GroupId是 “groupId-test”

该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id

如何获取消费者 group.id

在监听器中调用KafkaUtils.getConsumerGroupId()可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个 topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)

topicPartitions 显式分区分配

可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@KafkaListener(id = "thing2", topicPartitions =
        { 
    @TopicPartition(topic = "topic1", partitions = { 
    "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) { 
   
    ...
}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现KafkaListenerErrorHandler; 然后做一些异常处理;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}

调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

例如我写一个 批量消费的工厂类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /** * 监听器工厂 批量消费 * @return */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() { 
   
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字

concurrency并发数

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看 属性concurrency的作用及配置(RoundRobinAssignor 、RangeAssignor)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /** * 监听器工厂 * @return */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() { 
   
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

properties 配置其他属性

kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ; 同名的都可以修改掉;

用法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = { 
   
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 获取所有注册的监听器
        registry.getAllListenerContainers();

设置入参验证器

当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer { 
   

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { 
   
      registrar.setValidator(this.validator);
    }
}

使用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) { 
   
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() { 
   
    return (m, e) -> { 
   
        ...
    };
}

TODO…

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181902.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年10月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
SpringBoot集成kafka全面实战「建议收藏」
本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下《大白话kafka架构原理》、《秒懂kafka HA(高可用)》两篇文章。
全栈程序员站长
2022/07/01
5.4K1
SpringBoot集成kafka全面实战「建议收藏」
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器,
鱼找水需要时间
2023/02/16
3.4K0
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
Apache Kafka-通过concurrency实现并发消费
默认情况下, Spring-Kafka @KafkaListener 串行消费的。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。
小小工匠
2021/08/17
7.7K0
搭建单体SpringBoot项目 集成Kafka消息队列
通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。
郭顺发
2023/07/07
5610
SpringBoot 整合 Spring-Kafka 深度探秘,踩坑实战
kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。
搜云库技术团队
2019/11/21
4.3K0
聊聊如何实现一个带幂等模板的Kafka消费者
不知道大家有没有这样的体验,你跟你团队的成员,宣导一些开发时注意事项,比如在使用消息队列时,在消费端处理消息时,需根据业务场景,考虑一下幂等。后面走查代码的时,会发现一些资浅的开发,在需要幂等判断的场景的情况下,仍然没做幂等判断。既然宣导无效,就干脆实现一个带幂等模板的消费者,然后开发基于这个模板进行消费端业务处理。本文就以spring-kafka举例,聊聊如何实现一个带幂等模板的kafka消费者
lyb-geek
2022/05/24
1.2K0
聊聊如何实现一个带幂等模板的Kafka消费者
【spring-kafka】属性concurrency的作用及如何配置(RoundRobinAssignor 、RangeAssignor)
这个得看我们给Topic设置的分区数量; 总的来说就是 机器数量*concurrency <= 分区数
石臻臻的杂货铺[同名公众号]
2021/07/14
5.5K0
Spring Boot整合kafka
此处简单记录一下 SpringBoot 和 Kafka 的整合。 先初始化一个SpringBoot工程
闻说社
2025/04/17
950
Spring Boot整合kafka
当Spring邂逅Kafka,有趣的知识增加了
Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。
翊君
2022/03/08
1.1K0
Spring Boot Kafka概览、配置及优雅地实现发布订阅
本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net
别打名名
2019/12/24
15.8K0
Spring Boot Kafka概览、配置及优雅地实现发布订阅
springboot kafka集成(实现producer和consumer)
本文介绍如何在springboot项目中集成kafka收发message。 1、先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</ve
用户1225216
2018/03/05
3.5K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
Spring Kafka 是 Spring Framework 提供的一个集成 Apache Kafka 的库,用于构建基于 Kafka 的实时数据流处理应用程序。Apache Kafka 是一个高性能、分布式的流数据平台,广泛用于构建可扩展的、实时的数据处理管道。
苏泽
2024/03/10
1.1K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
在实际应用中,往往需要根据业务需求动态开启/关闭Kafka消费者监听。例如,在某些时间段内,可能需要暂停对某个Topic的消费,或者在某些条件下才开启对某个Topic的消费。
小小工匠
2023/06/04
4.6K0
Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现
Spring Kafka:@KafkaListener 单条或批量处理消息
来源:csdn.net/ldw201510803006/article/details/116176711 消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 图片 可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作; Liste
程序猿DD
2022/05/05
2.3K0
Spring Kafka:@KafkaListener 单条或批量处理消息
聊聊KafkaListener的实现机制
org/springframework/kafka/annotation/KafkaListener.java
code4it
2023/10/22
8150
Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
KafkaBootstrapConfiguration的主要功能是创建两个bean
小小工匠
2021/08/17
8260
Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
【kafka异常】使用Spring-kafka遇到的坑
有想进滴滴LogI开源用户群的加我个人微信: jjdlmn_ 进群(备注:进群) 群里面主要交流 kakfa、es、agent、LogI-kafka-manager、等等相关技术; 群内有专人解答你的问题 对~ 相关技术领域的解答人员都有; 你问的问题都会得到回应
石臻臻的杂货铺[同名公众号]
2021/07/14
6.3K0
【真实生产案例】SpringBoot 整合 Kafka 实现数据高吞吐
在上篇文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。
Java极客技术
2022/12/02
1.2K0
聊聊如何利用kafka实现请求-响应模式
在大多数场景中,我们经常使用kafka来做发布-订阅,在发布-订阅模型中,消息一旦发送就不再追踪后续处理,但在某些业务场景下,我们希望在发送消息后等待一个响应,然后根据这个响应来做我们后续的操作。在这种请求-响应模式,我们就可以利用spring kafka的ReplyingKafkaTemplate来实现
lyb-geek
2024/12/03
1420
聊聊如何利用kafka实现请求-响应模式
springboot中使用kafka
kafka 的事务是从0.11 版本开始支持的,kafka 的事务是基于 Exactly Once 语义的,它能保证生产或消费消息在跨分区和会话的情况下要么全部成功要么全部失败
六个核弹
2021/07/26
3.1K0
推荐阅读
相关推荐
SpringBoot集成kafka全面实战「建议收藏」
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验