在使用Spring-Kafka进行消息处理时,如果遇到反序列化Kafka消息类不在“受信任的包”中的问题,通常是由于Spring Security的配置导致的。Spring Security默认情况下会对反序列化的类进行安全检查,以防止潜在的安全风险,如反序列化漏洞(例如著名的CVE-2015-4852)。
反序列化:将字节流转换回原始对象的过程。 受信任的包:在Spring Security中,允许进行反序列化的类所在的包。
当Kafka消息类不在Spring Security配置的受信任包中时,反序列化操作会失败,并抛出安全异常。
可以通过以下几种方式解决这个问题:
在Spring Security配置中明确指定允许反序列化的包。
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
// ... 其他配置 ...
.headers()
.frameOptions().sameOrigin()
.and()
.csrf().disable()
.and()
.authorizeRequests()
.anyRequest().authenticated()
.and()
.sessionManagement()
.sessionCreationPolicy(SessionCreationPolicy.STATELESS);
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
// 设置反序列化器
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setClientIdPrefix("client-id-");
factory.getContainerProperties().setTransactionIdPrefix("tx-id-");
// 配置受信任的包
factory.setBeanFactory(new DefaultListableBeanFactory() {
@Override
protected void customizeBeanFactory(DefaultListableBeanFactory beanFactory) {
super.customizeBeanFactory(beanFactory);
beanFactory.addBeanPostProcessor(new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof KafkaMessageConverter) {
((KafkaMessageConverter) bean).setTrustedPackages(Arrays.asList("com.example.package"));
}
return bean;
}
});
}
});
return factory;
}
}
@KafkaListener
注解的properties
属性在@KafkaListener
注解中直接指定受信任的包。
@KafkaListener(topics = "test-topic", properties = {
"security.protocol=SASL_PLAINTEXT",
"sasl.mechanism=PLAIN",
"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";",
"allow.auto.create.topics=true",
"auto.offset.reset=earliest",
"key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer",
"value.deserializer.org.springframework.kafka.support.serializer.JsonDeserializer.trustedPackages=com.example.package"
})
public void listen(ConsumerRecord<String, MyMessage> record) {
// 处理消息
}
实现自定义的反序列化器,并在其中指定受信任的包。
public class CustomDeserializer implements Deserializer<MyMessage> {
@Override
public MyMessage deserialize(String topic, byte[] data) {
try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInput in = new ObjectInputStream(bis)) {
in.setObjectInputFilter(filterInfo -> {
if (filterInfo.serialClass() != null && filterInfo.serialClass().getName().startsWith("com.example.package")) {
return ObjectInputFilter.Status.ALLOWED;
} else {
return ObjectInputFilter.Status.REJECTED;
}
});
return (MyMessage) in.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Failed to deserialize message", e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 配置相关参数
}
@Override
public void close() {
// 关闭资源
}
}
然后在Kafka配置中使用这个自定义的反序列化器。
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
return props;
}
通过以上方法,可以有效解决Spring-Kafka中反序列化消息类不在受信任包中的问题。
领取专属 10元无门槛券
手把手带您无忧上云