spring Kafka model is not in the trusted packages(Spring Kafka模型不在受信任的包中)
问题描述
我正在使用spring-Kafka-2.1.5
和spring-boot-2.0.5
处理微服务
第一个服务将向Kafka生成一些消息,第二个服务将消费这些消息,而消费时我遇到了问题
Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the trusted packages: [java.util, java.lang, com.service2.model.ConsumeMessage].
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
因此,从错误消息来看,这是Service1的com.service1.model.TopicMessage
序列化模型。但我正在尝试将消息反序列化到Service2中的模型com.service2.model.ConsumeMessage
中,并出现此问题
我发现了同样的问题here,并尝试了下面的格式和文档docs
下面是我的配置
@Bean(name = "kafkaConsumerConfig")
public Map<String, Object> kafkaConsumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetconfig);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
}
kafkaConsumer erFactory
@Bean(name = "kafkaConsumerFactory")
public ConsumerFactory<String, ConsumeMessage> kafkaConsumerFactory() {
JsonDeserializer<ConsumeMessage>
deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.service2.model");
return new DefaultKafkaConsumerFactory<String, ConsumeMessage>(kafkaConsumerConfig(),new StringDeserializer(),deserializer);
}
kafkaListenerContainerFactory
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(Integer.parseInt(threads));
factory.setBatchListener(true);
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
推荐答案
您需要禁用反序列化程序中的header precedence
:
/**
* Construct an instance with the provided target type, and
* useHeadersIfPresent with a default {@link ObjectMapper}.
* @param targetType the target type.
* @param useHeadersIfPresent true to use headers if present and fall back to target
* type if not.
* @since 2.2
*/
public JsonDeserializer(Class<? super T> targetType, boolean useHeadersIfPresent) {
必须将useHeadersIfPresent
参数配置为false
。这样,将使用inferred
类型,并且将忽略标头值。
如果您不使用spring-kafka-2.2
,则应考虑使用类似的逻辑实现您自己的JsonDeserializer
:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java
这篇关于Spring Kafka模型不在受信任的包中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Spring Kafka模型不在受信任的包中
基础教程推荐
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 在螺旋中写一个字符串 2022-01-01