首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spring-Kafka :反序列化kafka消息类时的问题不在“受信任的包”中?

在使用Spring-Kafka进行消息处理时,如果遇到反序列化Kafka消息类不在“受信任的包”中的问题,通常是由于Spring Security的配置导致的。Spring Security默认情况下会对反序列化的类进行安全检查,以防止潜在的安全风险,如反序列化漏洞(例如著名的CVE-2015-4852)。

基础概念

反序列化:将字节流转换回原始对象的过程。 受信任的包:在Spring Security中,允许进行反序列化的类所在的包。

相关优势

  • 安全性:防止不受信任的类被反序列化,减少安全风险。
  • 灵活性:可以根据需要配置哪些包是安全的,哪些不是。

类型与应用场景

  • 默认配置:Spring Security默认不允许反序列化不在受信任包中的类。
  • 自定义配置:根据应用需求,可以自定义受信任的包列表。

问题原因

当Kafka消息类不在Spring Security配置的受信任包中时,反序列化操作会失败,并抛出安全异常。

解决方法

可以通过以下几种方式解决这个问题:

方法一:配置受信任的包

在Spring Security配置中明确指定允许反序列化的包。

代码语言:txt
复制
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
            // ... 其他配置 ...
            .headers()
                .frameOptions().sameOrigin()
                .and()
            .csrf().disable()
            .and()
            .authorizeRequests()
                .anyRequest().authenticated()
                .and()
            .sessionManagement()
                .sessionCreationPolicy(SessionCreationPolicy.STATELESS);
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory);

        // 设置反序列化器
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setClientIdPrefix("client-id-");
        factory.getContainerProperties().setTransactionIdPrefix("tx-id-");

        // 配置受信任的包
        factory.setBeanFactory(new DefaultListableBeanFactory() {
            @Override
            protected void customizeBeanFactory(DefaultListableBeanFactory beanFactory) {
                super.customizeBeanFactory(beanFactory);
                beanFactory.addBeanPostProcessor(new BeanPostProcessor() {
                    @Override
                    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                        return bean;
                    }

                    @Override
                    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                        if (bean instanceof KafkaMessageConverter) {
                            ((KafkaMessageConverter) bean).setTrustedPackages(Arrays.asList("com.example.package"));
                        }
                        return bean;
                    }
                });
            }
        });

        return factory;
    }
}

方法二:使用@KafkaListener注解的properties属性

@KafkaListener注解中直接指定受信任的包。

代码语言:txt
复制
@KafkaListener(topics = "test-topic", properties = {
    "security.protocol=SASL_PLAINTEXT",
    "sasl.mechanism=PLAIN",
    "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";",
    "allow.auto.create.topics=true",
    "auto.offset.reset=earliest",
    "key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer",
    "value.deserializer.org.springframework.kafka.support.serializer.JsonDeserializer.trustedPackages=com.example.package"
})
public void listen(ConsumerRecord<String, MyMessage> record) {
    // 处理消息
}

方法三:自定义反序列化器

实现自定义的反序列化器,并在其中指定受信任的包。

代码语言:txt
复制
public class CustomDeserializer implements Deserializer<MyMessage> {

    @Override
    public MyMessage deserialize(String topic, byte[] data) {
        try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
             ObjectInput in = new ObjectInputStream(bis)) {
            in.setObjectInputFilter(filterInfo -> {
                if (filterInfo.serialClass() != null && filterInfo.serialClass().getName().startsWith("com.example.package")) {
                    return ObjectInputFilter.Status.ALLOWED;
                } else {
                    return ObjectInputFilter.Status.REJECTED;
                }
            });
            return (MyMessage) in.readObject();
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("Failed to deserialize message", e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 配置相关参数
    }

    @Override
    public void close() {
        // 关闭资源
    }
}

然后在Kafka配置中使用这个自定义的反序列化器。

代码语言:txt
复制
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
    return props;
}

通过以上方法,可以有效解决Spring-Kafka中反序列化消息类不在受信任包中的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Apache Kafka-SpringBoot整合Kafka发送复杂对象

的自动化配置的支持,但没有提供 spring-boot-kafka-starter 包… ---- 配置文件 spring: # Kafka 配置项,对应 KafkaProperties 配置类...特别说明一下: 生产者 的value-serializer 配置了 Spring-Kafka 提供的 JsonSerializer 序列化类, 使用 JSON 的方式,序列化复杂的 Message 消息...消费者的 value-serializer 配置,同样使用了 JsonDeserializer 反序列化类,因为稍后我们要使用 JSON 的方式,反序列化复杂的 Message 消息。...因为 JsonDeserializer 在反序列化消息时,考虑到安全性,只反序列化成信任的 Message 类。...务必配置 在序列化时,使用了 JsonSerializer 序列化 Message 消息对象,它会在 Kafka 消息 Headers 的 TypeId 上,值为 Message 消息对应的类全名。

2.2K21

kafka第三次课!

其实就没用了 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer...=1000 当kafka中没有初始offset或offset超出范围时将自动重置offset earliest:重置为分区中最小的offset; latest:重置为分区中最新的offset(消费分区中新产生的数据...=180000 Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer...其实就没用了 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer...=180000 Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

9810
  • 「首席架构师看Event Hub」Kafka的Spring 深入挖掘 -第1部分

    SeekToCurrentErrorHandler丢弃轮询()中的剩余记录,并在使用者上执行查找操作来重置偏移量,以便在下一次轮询时再次获取被丢弃的记录。...此反序列化器包装委托反序列化器并捕获任何异常。然后将它们转发给侦听器容器,后者将它们直接发送给错误处理程序。异常包含源数据,因此可以诊断问题。...消息转换器bean推断要转换为方法签名中的参数类型的类型。 转换器自动“信任”类型。Spring Boot自动将转换器配置到侦听器容器中。...此外,由于我们没有推断类型,所以需要将消息转换器配置为“信任”映射类型的包。 在本例中,我们将在两端使用消息转换器(以及StringSerializer和StringDeserializer)。...注意,我们必须告诉它使用TYPE_ID头来确定转换的类型。同样,Spring Boot会自动将消息转换器配置到容器中。下面是应用程序片段中的生产端类型映射。

    1.5K40

    Apache Kafka-消费端消费重试和死信队列

    当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。...默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。...我们在应用中可以对死信队列中的消息进行监控重发,来使得消费者实例再次进行消费,消费端需要做幂等性的处理。...# 消息的 key 的序列化 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #...消息的 value 的序列化 # Kafka Consumer 配置项 consumer: auto-offset-reset: earliest # 设置消费者分组最初的消费进度为

    12.9K41

    SpringBoot 整合 Spring-Kafka 深度探秘,踩坑实战

    除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。...不过这些我们在Kafka安装包配置文件中的配置项,在注解参数中都可以配置,下面详解下@EmbeddedKafka注解中的可设置参数 : value:broker节点数量count:同value作用一样,...事务消息 默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。...除了上面谈到的通过手动Ack模式来控制消息偏移量外,其实Spring-kafka内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。...Spring-kafka的各种用法,发现了很多好玩很酷的特性,比如,一个注解开启嵌入式的Kafka服务、像RPC调用一样的发送\响应语义调用、事务消息等功能。

    4.2K20

    实战:彻底搞定 SpringBoot 整合 Kafka(spring-kafka深入探秘)

    除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。...不过这些我们在Kafka安装包配置文件中的配置项,在注解参数中都可以配置,下面详解下@EmbeddedKafka注解中的可设置参数 : value:broker节点数量 count:同value作用一样...事务消息 默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。...除了上面谈到的通过手动Ack模式来控制消息偏移量外,其实Spring-kafka内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。...Spring-kafka的各种用法,发现了很多好玩很酷的特性,比如,一个注解开启嵌入式的Kafka服务、像RPC调用一样的发送\响应语义调用、事务消息等功能。

    51.2K76

    集成到ACK、消息重试、死信队列

    除了简单的收发消息外,Spring-kafka 还提供了很多高级功能,下面我们就来一一探秘这些用法。...不过这些我们在 Kafka 安装包配置文件中的配置项,在注解参数中都可以配置,下面详解下 @EmbeddedKafka 注解中的可设置参数 : value:broker 节点数量 count:同 value...事务消息 默认情况下,Spring-kafka 自动生成的 KafkaTemplate 实例,是不具有事务消息发送能力的。...除了上面谈到的通过手动 Ack 模式来控制消息偏移量外,其实 Spring-kafka 内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。...,所以系统性的探索了下 Spring-kafka 的各种用法,发现了很多好玩很酷的特性,比如,一个注解开启嵌入式的 Kafka 服务、像 RPC 调用一样的发送、响应语义调用、事务消息等功能。

    3.5K50

    Kafka 客户端开发

    1 开发概述 Kafka 中,客户端与服务端是通过 TCP 协议进行的; Kafka 公布了所有功能协议(与特定语言无关),并把 Java 客户端作为 kafka 项目的一部分进行维护。...Kafka 提供了五类 API: Producer API: 向主题(一个或多个)发布消息; Consumer API: 订阅主题(一个或多个),拉取这些主题上发布的消息; Stream API: 作为流处理器..."); // [必填] KEY 的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer..."); // [必填] VALUE 的序列化类 // props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner..."); // [必填] KEY 的反序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer

    1.2K40

    一次机房停电引发的思考

    broker,前一天晚上机房停电导致 leader 节点挂了),导致网关的反爬过滤器里面发送 kafka 消息的代码 kafkaTemplat.send 阻塞了 60s,当时在想这个 send 方法不是异步的吗...函数得到对应的 leader 时,最大的等待时间,默认值为 60 秒 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽 buffer.memory 这个缓存空间。...//kafka.apache.org/10/documentation.html[4] 虽然调整一些参数,但是 kafka 集群不可用或请求量过大时,还是对主流程有短暂的阻塞 方案 2:真异步 kafkaTemplat.send...producer send 异步发送耗时问题的分析[5]》说多线程高并发下 producer.send 的损耗比较严重,这个还要等到后续压测之后再更新文章吧 参考文章 站在巨人的肩膀上 Kafka producer...异步发送在某些情况会阻塞主线程,使用时候慎重[6] HAVENT 原创 Spring Boot + Spring-Kafka 异步配置[7] 关于高并发下 kafka producer send 异步发送耗时问题的分析

    79030

    springboot 之集成kafka

    本章只介绍springboot微服务集成kafka,跟rabbitmq用法相同,作为一个消息中间件收发消息使用,本章仅介绍集成后的基础用法,研究不深,请各位谅解。...环境准备 IntelliJ IDEA 前一章中搭建的微服务框架 前一章之后,对目录结构进行了优化,将config相关类都放到demo.config包下 开始集成 pom.xml中增加依赖包...artifactId>spring-kafka 加入依赖包后最好先执行mvn clean install编一把,把所需依赖包下下来,...retries: 0 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。...# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

    56430

    深入Spring Boot (十三):整合Kafka详解

    整合Kafka 使用IDEA新建项目,选择maven管理依赖和构建项目,在pom.xml中添加spring-boot-starter和spring-kafka依赖配置,项目中会使用单元测试检查整合是否正确...,application.properties中参数会在应用启动时被加载解析并初始化,更多生产者和消费者的参数配置请查阅官方文档。...=0# 生产者消息key和消息value的序列化处理类spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer...testGroup# 消费者消息key和消息value的序列化处理类spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer...压缩包中Kafka脚本在Unix和Windows平台是不同的,下面使用到的相关命令,如果在Unix平台下请使用bin/,如果在Windows平台下请使用bin\windows\,并且脚本扩展名分别为.bat

    1.7K20

    当Spring邂逅Kafka,有趣的知识增加了

    2.2 配置Topic 我们先来回顾下什么是topic: 在 Kafka 中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为 topic 。...initialOffset被设置为0,所以每次初始化这个监听器时,所有之前消耗的0和3分区的消息都会被重新消费。...这需要在ProducerFactory中配置适当的序列化器,在ConsumerFactory中配置解序列化器。 让我们看看一个简单的bean类,我们将把它作为消息发送。...的JSON序列化器和反序列化器使用Jackson库,这也是spring-kafka项目的可选Maven依赖。...总结 在这篇文章中,我们介绍了如何安装Kafka以及Spring支持Apache Kafka的基本情况。我们简单学习了一下用于发送和接收消息的类。

    1.1K10

    如何使用Docker内的kafka服务

    基于Docker可以很轻松的搭建一个kafka集群,其他机器上的应用如何使用这个kafka集群服务呢?本次实战就来解决这个问题。...spring boot:1.5.9.RELEASE spring-kafka:1.3.8.RELEASE 重点介绍 本次实战有几处重点需要注意: spring-kafka和kafka的版本匹配问题,请关注官方文档...:https://spring.io/projects/spring-kafka kafka的kafka的advertised.listeners配置,应用通过此配置来连接broker; 应用所在服务器要配置...的配置,这个参数会写到kafka配置的advertised.listeners这一项中,应用会用来连接broker; 第二,KAFKA_CREATE_TOPICS的配置,表示容器启动时会创建名为"topic001...Consumer类,收到消息后,会将内容内容和消息的详情打印出来: @Component public class Consumer { @KafkaListener(topics = {"topic001

    1.5K30

    SpringBoot3集成Kafka

    ,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案; 二、环境搭建 1、Kafka部署 1、下载安装包:kafka_2.13-3.5.0.tgz 2、配置环境变量...,首先spring-kafka组件选择与boot框架中spring相同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合; 但是该版本使用的是kafka-clients组件的...3.3.2版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients...模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topic和key以及消息主体,实现消息的生产; @RestController public class...编写消息监听类,通过KafkaListener注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle查看topic和消息列表; @Component public

    89820

    Apache Kafka-通过concurrency实现并发消费

    ---- 概述 默认情况下, Spring-Kafka @KafkaListener 串行消费的。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。...举个例子 : 如果设置 concurrency=2 时,Spring-Kafka 就会为该 @KafkaListener标注的方法消费的消息 创建 2个线程,进行并发消费。...然后,每个kafka Consumer 会被单独分配到一个线程中pull 消息, 消费消息 之后,Kafka Broker将Topic RRRR 分配给创建的 2个 Kafka Consumer 各 1...Spring-Kafka 提供的并发消费,需要创建多个 Kafka Consumer 对象,并且每个 Consumer 都单独分配一个线程,然后 Consumer 拉取完消息之后,在各自的线程中执行消费...消息的 value 的序列化 # Kafka Consumer 配置项 consumer: auto-offset-reset: earliest # 设置消费者分组最初的消费进度为

    7.5K20

    Spring Boot Kafka概览、配置及优雅地实现发布订阅

    创建DefaultKafkaProducerFactory时,可以通过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例可以传递给...以前,容器线程在consumer.poll()方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象表明有问题。...、生产者、流式处理中都可以单独配置SSL(可能是微服务部署,消费者和生产者不在同一个应用中)。...spring.kafka.consumer.isolation-level # 密钥的反序列化程序类 spring.kafka.consumer.key-deserializer # 在对poll()的单个调用中返回的最大记录数...spring.kafka.consumer.ssl.trust-store-type # 值的反序列化程序类。

    15.7K72
    领券