首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Spring-Kafka :反序列化kafka消息类时的问题不在“受信任的包”中?

在使用Spring-Kafka进行消息处理时,如果遇到反序列化Kafka消息类不在“受信任的包”中的问题,通常是由于Spring Security的配置导致的。Spring Security默认情况下会对反序列化的类进行安全检查,以防止潜在的安全风险,如反序列化漏洞(例如著名的CVE-2015-4852)。

基础概念

反序列化:将字节流转换回原始对象的过程。 受信任的包:在Spring Security中,允许进行反序列化的类所在的包。

相关优势

  • 安全性:防止不受信任的类被反序列化,减少安全风险。
  • 灵活性:可以根据需要配置哪些包是安全的,哪些不是。

类型与应用场景

  • 默认配置:Spring Security默认不允许反序列化不在受信任包中的类。
  • 自定义配置:根据应用需求,可以自定义受信任的包列表。

问题原因

当Kafka消息类不在Spring Security配置的受信任包中时,反序列化操作会失败,并抛出安全异常。

解决方法

可以通过以下几种方式解决这个问题:

方法一:配置受信任的包

在Spring Security配置中明确指定允许反序列化的包。

代码语言:txt
复制
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
            // ... 其他配置 ...
            .headers()
                .frameOptions().sameOrigin()
                .and()
            .csrf().disable()
            .and()
            .authorizeRequests()
                .anyRequest().authenticated()
                .and()
            .sessionManagement()
                .sessionCreationPolicy(SessionCreationPolicy.STATELESS);
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory);

        // 设置反序列化器
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setClientIdPrefix("client-id-");
        factory.getContainerProperties().setTransactionIdPrefix("tx-id-");

        // 配置受信任的包
        factory.setBeanFactory(new DefaultListableBeanFactory() {
            @Override
            protected void customizeBeanFactory(DefaultListableBeanFactory beanFactory) {
                super.customizeBeanFactory(beanFactory);
                beanFactory.addBeanPostProcessor(new BeanPostProcessor() {
                    @Override
                    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                        return bean;
                    }

                    @Override
                    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                        if (bean instanceof KafkaMessageConverter) {
                            ((KafkaMessageConverter) bean).setTrustedPackages(Arrays.asList("com.example.package"));
                        }
                        return bean;
                    }
                });
            }
        });

        return factory;
    }
}

方法二:使用@KafkaListener注解的properties属性

@KafkaListener注解中直接指定受信任的包。

代码语言:txt
复制
@KafkaListener(topics = "test-topic", properties = {
    "security.protocol=SASL_PLAINTEXT",
    "sasl.mechanism=PLAIN",
    "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";",
    "allow.auto.create.topics=true",
    "auto.offset.reset=earliest",
    "key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer",
    "value.deserializer.org.springframework.kafka.support.serializer.JsonDeserializer.trustedPackages=com.example.package"
})
public void listen(ConsumerRecord<String, MyMessage> record) {
    // 处理消息
}

方法三:自定义反序列化器

实现自定义的反序列化器,并在其中指定受信任的包。

代码语言:txt
复制
public class CustomDeserializer implements Deserializer<MyMessage> {

    @Override
    public MyMessage deserialize(String topic, byte[] data) {
        try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
             ObjectInput in = new ObjectInputStream(bis)) {
            in.setObjectInputFilter(filterInfo -> {
                if (filterInfo.serialClass() != null && filterInfo.serialClass().getName().startsWith("com.example.package")) {
                    return ObjectInputFilter.Status.ALLOWED;
                } else {
                    return ObjectInputFilter.Status.REJECTED;
                }
            });
            return (MyMessage) in.readObject();
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("Failed to deserialize message", e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 配置相关参数
    }

    @Override
    public void close() {
        // 关闭资源
    }
}

然后在Kafka配置中使用这个自定义的反序列化器。

代码语言:txt
复制
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
    return props;
}

通过以上方法,可以有效解决Spring-Kafka中反序列化消息类不在受信任包中的问题。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

没有搜到相关的沙龙

领券