Spring cloud Kafka Stream StreamsUncaughtExceptionHandler(春云Kafka StreamsUncaughtExceptionHandler)
问题描述
我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。
配置Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
自定义异常处理程序:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
流处理函数:
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
我希望UnCaughtExceptionHandler处理由service.process()方法引发的任何异常。但是异常永远不会进入Handle方法;相反,它们传播到根并杀死客户端。我也看过this solution,但我想以更独立的方式处理它。
问题:如何使用StreamsUncaughtExceptionHandler处理任何处理异常?
- Spring Boot版本:2.6.3
- 春云溪流版本:3.2.1
- Spring-Cloud-Stream-Binder-Kafka-Streams:3.2.1
- Kafka-Streams:3.0.0
可复制示例:spring-cloud-kafka-streams-exception
推荐答案
以下是您可以尝试的几种方法。
尝试在
StreamsBuilderFactoryBean
中的this line处设置断点,并查看配置的值是什么。这应该会给出一些线索。我注意到您在配置的Impl中为订单设置了
Integer.MAX_VALUE
。默认情况下,StreamsBuilderFactoryBean
使用阶段值Integer.MAX_VALUE - 1000
,因此在工厂Bean准备启动时,配置器可能还不可用,因为Integer.MAX_VALUE
的优先级较低。您可以将订单更改为类似Integer.MAX_VALUE - 5000
的内容,以确保在启动工厂Bean之前完全实例化配置Bean。
这篇关于春云Kafka StreamsUncaughtExceptionHandler的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:春云Kafka StreamsUncaughtExceptionHandler
基础教程推荐
- 在螺旋中写一个字符串 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01