How to use @Transactional with @KafkaListener?(如何将@Transaction与@KafkaListener一起使用?)
问题描述
有没有可能将声明性TX管理(通过@Transaction)与@KafkaListener注释方法一起使用? 例如,我想使用它来为每个监听器定义单独的发送超时。 我的设置如下:
TransactionManager:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
KafkaListener:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
问题是-KafkaMessageListenerContainer在调用此类方法之前创建自己的事务-它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
未使用TransactionInterceptor。那么具体的@KafkaListener方法如何设置具体的TX超时时间呢?
推荐答案
可以这样做,但有点复杂,因为您必须将消耗的偏移量发送到Kafka交易。
不使用ChainedKafkaTransactionManager
,您可以为容器使用KafkaTransactionManager
,为HibernateTransactionManager
使用@Transactional
。
这将产生类似的结果,因为Hibernate Tx将在Kafka事务之前提交(如果Hibernate提交失败,则Kafka Tx将回滚)。
编辑
若要将不同的链式TM配置到每个侦听器容器中,可以执行以下操作。
@组件 类ContainerFactoryCustomizer{ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
ChainedKafkaTransactionManager<?, ?> chainedOne,
ChainedKafkaTransactionManager<?, ?> chainedTwo) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().setTransactionManager(chainedOne);
}
else {
container.getContainerProperties().setTransactionManager(chainedTwo);
}
});
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
这篇关于如何将@Transaction与@KafkaListener一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何将@Transaction与@KafkaListener一起使用?
基础教程推荐
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 在螺旋中写一个字符串 2022-01-01