这篇文章主要介绍了RxJava取消订阅的各种方式的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
手动取消订阅
Consumer类型
Observable创建返回Disposable取消
public class SecondActivity extends AppCompatActivity {
private static final String TAG = "SecondActivity";
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_second);
disposable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: "+s);
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
Log.d(TAG, "onDestroy: ");
//取消订阅
if(disposable != null && !disposable.isDisposed()){
disposable.dispose();
Log.d(TAG, "onDestroy: dispose");
}
}
}
普通类型Observer
在Observer中获取Disposable然后取消
public class ThirdActivity extends AppCompatActivity {
private static final String TAG = "ThirdActivity";
Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_third);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
Log.d(TAG, "onDestroy: ");
//然后在需要取消订阅的地方调用即可
if (disposable != null && !disposable.isDisposed()) {
Log.d(TAG, "dispose: ");
disposable.dispose();
}
}
}
DisposableObserver类型
利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消
public class FourthActivity extends AppCompatActivity {
private static final String TAG = "FourthActivity";
private DisposableObserver<String> observer;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_fourth);
observer = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(String o) {
Log.d(TAG, "onNext: "+o);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
if (observer != null && !observer.isDisposed()) {
Log.d(TAG, "dispose: ");
observer.dispose();
}
}
}
取消多个Observer
把多个Observer添加CompositeDisposable,一次取消
public class ComDisposableActivity extends AppCompatActivity {
private Disposable disposable1;
private Disposable disposable2;
private static final String TAG = "ComDisposableActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_com_disposable);
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: Unsubscribing subscription from onCreate()");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable1 = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
Thread.sleep(5000);
emitter.onNext("testInfo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposable2 = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
CompositeDisposable compositeDisposable = new CompositeDisposable();
//批量添加
compositeDisposable.add(disposable1);
compositeDisposable.add(disposable2);
//最后一次性全部取消订阅
compositeDisposable.dispose();
}
}
RxLifecyle取消
OnDestory取消
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.d(TAG, "accept: " + num);
}
});
指定生命周期取消
Observable.interval(1,TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");
}
}).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "bindUntilEvent accept: " + aLong);
}
});
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程学习网。
沃梦达教程
本文标题为:RxJava取消订阅的各种方式的实现
基础教程推荐
猜你喜欢
- Android开发Compose集成高德地图实例 2023-06-15
- MVVMLight项目Model View结构及全局视图模型注入器 2023-05-07
- iOS开发使用XML解析网络数据 2022-11-12
- iOS开发 全机型适配解决方法 2023-01-14
- iOS Crash常规跟踪方法及Bugly集成运用详细介绍 2023-01-18
- iOS中如何判断当前网络环境是2G/3G/4G/5G/WiFi 2023-06-18
- Flutter进阶之实现动画效果(三) 2022-10-28
- IOS获取系统相册中照片的示例代码 2023-01-03
- Android实现短信验证码输入框 2023-04-29
- Android Compose自定义TextField实现自定义的输入框 2023-05-13