这篇文章主要介绍了Reactor中的onErrorContinue 和 onErrorResume,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下
前言
这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorContinue 时,onErrorResume 会在它旁边弹出。让我把我的测试代码和我的一些解释粘贴在下面。
1 基础功能
这是一个简单的函数,将 5 个连续的数字分别乘以 2,然后相加,当 i==2 时抛出一个异常:
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
显然,输出如下:
input=1
input=2
Exception in thread "main" java.lang.ArithmeticException: / by zero
2 只有 onErrorResume ()
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorResume(err -> {
log.info("onErrorResume");
return Flux.empty();
})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
输出如下:
input=1
input=2
17:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResume
sum=2
正如官方文档描述(onErrorResume)的那样,onErrorResume
用返回内容替换 Flux,因此在 2 之后的数据不会被处理。唯一值得一提的是,onErrorResume()
不必马上在错误之后捕获异常。在 onErrorContinue()
的官网文档中(onErrorContinue),只有 onErrorContinue()
强调了了 upstream 关键词,但显然,它还有其他含义。
3 只有 onErrorContinue()
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
输出:
input=1
input=2
17:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26
显然,onErrorContinue
会丢弃错误数据 2, 然后继续数字 3 直到 5。
4 onErrorResume() 然后 onErrorContinue()
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.map(i -> i == 2 ? i / 0 : i)
.map(i -> i * 2)
.onErrorResume(err -> {
log.info("onErrorResume");
return Flux.empty();
})
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
输出和上面一样:
input=1
input=2
17:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26
这样的结果,你想到了吗?onErrorContinue()
会在 onErrorResume()
得到错误之前处理这个错误。当两个错误处理函数在同一个函数中的时候很明显,但是当你的函数中只有 onErrorResume()
,而一些调用者实际上有 onErrorContinue()
时,你的 onErrorResume()
没有被调用的原因可能就不那么明显了。
5 使用 onErrorResume() 模拟 onErrorContinue()
有些博客建议我们完全不用 onErrorContinue()
,且在所有场景中仅用 onErrorResume()
。但是上述示例已经展示了它们会产生不同的结果。那我们怎么实现呢?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.flatMap(i -> Mono.just(i)
.map(j -> j == 2 ? j / 0 : j)
.map(j -> j * 2)
.onErrorResume(err -> {
System.out.println("onErrorResume");
return Mono.empty();
})
)
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
因此,本质上是将可能在 flatMap 或 concatMap 中抛出错误的操作包装起来,并在其上使用 onErrorResume()
。这样,它会产生相同的结果:
input=1
input=2
onErrorResume
input=3
input=4
input=5
sum=26
6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue()
有时候,onErrorContinue() 放在调用程序中,您无法控制它。但你仍然需要 onErrorResume()。你该怎么办?
public static void main(String... args) {
Flux.range(1,5)
.doOnNext(i -> System.out.println("input=" + i))
.flatMap(i -> Mono.just(i)
.map(j -> j == 2 ? j / 0 : j)
.map(j -> j * 2)
.onErrorResume(err -> {
System.out.println("onErrorResume");
return Mono.empty();
})
.onErrorStop()
)
.onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
.reduce((i,j) -> i+j)
.doOnNext(i -> System.out.println("sum=" + i))
.block();
}
秘诀是在 onErrorResume() 代码块的末尾添加 onErrorStop() ——这会阻塞 onErrorContinue(),这样它就不会在 onErrorResume() 之前占用错误。尝试删除 onErrorStop(),你会看到 onErrorContinue() 在 onErrorResume 之前弹出。
到此这篇关于Reactor中的onErrorContinue 和 onErrorResume的文章就介绍到这了,更多相关Reactor onErrorContinue 内容请搜索编程学习网以前的文章希望大家以后多多支持编程学习网!
本文标题为:Reactor中的onErrorContinue 和 onErrorResume
基础教程推荐
- Java并发编程进阶之线程控制篇 2023-03-07
- Java实现查找文件和替换文件内容 2023-04-06
- Java文件管理操作的知识点整理 2023-05-19
- Java数据结构之对象比较详解 2023-03-07
- java实现多人聊天系统 2023-05-19
- Java实现线程插队的示例代码 2022-09-03
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- springboot自定义starter方法及注解实例 2023-03-31
- java基础知识之FileInputStream流的使用 2023-08-11
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02