How to convert Listlt;Tgt; to Fluxlt;Tgt; by using Reactor 3.x(如何使用电抗器3.x将LISTlt;T;转换为通量lt;T)
本文介绍了如何使用电抗器3.x将LIST<;T&>;转换为通量<;T&>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个Asyn Call Thrift接口:
public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addObserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}
@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId, callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites", e);
}
return future;
}
现在它返回CompletableFuture<;列表>;。然后我调用它来使用Flux做一些处理器。
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
// do not like it
List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
但是,我想从getFavourites
方法中获取一个通量,并且可以在getRecommend
方法中使用它。或者,您可以推荐
Flux API
,我可以将List<Long> recommendList
转换为Flux<Long> recommendFlux
。
推荐答案
要将CompletableFuture<List<T>>
转换为Flux<T>
,可以使用Mono#fromFuture
和Mono#flatMapMany
:
var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
List<T>
在回调中异步接收到的Flux<T>
也可以不使用CompletableFuture
转换为Flux<T>
。
您可以直接使用Mono#create
和Mono#flatMapMany
:
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
}).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
或简单使用Flux#create
一次多次排放:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
});
flux.subscribe(System.out::println);
这篇关于如何使用电抗器3.x将LIST<;T&>;转换为通量<;T&>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:如何使用电抗器3.x将LIST<;T&>;转换为通量<;T&>
基础教程推荐
猜你喜欢
- 在螺旋中写一个字符串 2022-01-01
- Spring Boot Freemarker从2.2.0升级失败 2022-01-01
- 由于对所需库 rt.jar 的限制,对类的访问限制? 2022-01-01
- 首次使用 Hadoop,MapReduce Job 不运行 Reduce Phase 2022-01-01
- Java 中保存最后 N 个元素的大小受限队列 2022-01-01
- 如何强制对超级方法进行多态调用? 2022-01-01
- 如何在不安装整个 WTP 包的情况下将 Tomcat 8 添加到 Eclipse Kepler 2022-01-01
- 如何使用 Eclipse 检查调试符号状态? 2022-01-01
- 如何对 HashSet 进行排序? 2022-01-01
- 如何使用 Stream 在集合中拆分奇数和偶数以及两者的总和 2022-01-01