熟悉RxJava的知道,onError跟onComplete是互斥的,出现其中一个,观察者与被观察者的关系就被中断(以下简称:管道中断),观察者就永远不会收到来自被观察者发出的事件。
然后有些情况下,出现了错误,我们希望可以进行一些补救措施,例如:
- 由于网络原因或者其他原因,Http请求失败了,这个时候我们希望进行重试,又或者去读取本地的缓存数据
- 在使用RxJava的组合操作符进行Http并发请求时,我们希望接口之间互不影响,即A接口出现错误不会影响B接口的正常流程,反之一样
现实开发中,可能有更多的场景需要对错误进行补救,所以RxJava为我们提供了两大类进行错误处理,分别是Catch和Retry,前者在出现错误时补救,后者在出现错误时重试,接下来,分别对它们进行讲解
注:Catch和Retry只能捕获上游事件的异常
Catch
Catch操作符共有5个,分别是:
onErrorReturnItem(final T item) //内部调用第二个方法
onErrorReturn(Function function) //遇到错误,用默认数据项替代
onErrorResumeNext(final ObservableSource<? extends T> next) //内部调用第四个方法
onErrorResumeNext(Function resumeFunction ) //遇到错误,开始发射新的Observable的数据序列
onExceptionResumeNext(final ObservableSource<? extends T> next) //内部原理与第四个相同,仅有一个参数不同
复制代码
虽然有5个操作符,但是实际上就只有3个,再准确点说就只有2个,为什么这么说呢,因为第1个操作符内部调用的就是第2个,而第3个操作符内部调用是第4个操作符,所以说只有3个,那为什么准确点说只有2个呢,因为第5个操作符,内部原理同第四个,仅仅有一个参数传的不一样,接下来我们分别讲解。
onErrorReturnItem
Disposable disposable = Observable.fromCallable(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return null; //返回null,即出现错误}}).onErrorReturnItem(100) //出现错误时,用一个默认的数据项将其替代.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {// 打印 100 随后会立即回调onComplete}});复制代码
我们再来看看onErrorReturnItem
内部实现
public final Observable<T> onErrorReturnItem(final T item) {ObjectHelper.requireNonNull(item, "item is null");//将item封装成Function对象,并调用onErrorReturn方法return onErrorReturn(Functions.justFunction(item));
}
复制代码
onErrorReturn
内部源码较为简单,这里不做讲解,接下来看看onErrorReturn
如何使用
onErrorReturn
Disposable disposable = Observable.fromCallable(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return null; //返回null,即出现错误}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {//出现错误时,用一个默认的数据项将其替代,这里根据不同的错误返回不同的数据项return 100; }}) .subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {// 打印 100 随后会立即回调onComplete}});
复制代码
看到上面代码可以明白,onErrorReturn
的作用就是在出现错误的时候,用一个默认的数据项将错误替代,并立刻回调onComplete。
onErrorResumeNext(final ObservableSource next)
public final Observable<T> onErrorResumeNext(final ObservableSource<? extends T> next) {ObjectHelper.requireNonNull(next, "next is null");//可以看到这里将Observable对象封装成Function对象,并调用onErrorResumeNext方法return onErrorResumeNext(Functions.justFunction(next));
}
复制代码
看源码知道onErrorResumeNext(final ObservableSource next)
内部调用了onErrorResumeNext(Function resumeFunction )
故这里不再讲解
onErrorResumeNext(Function resumeFunction )
onErrorResumeNext
的作用就是在遇到错误时开始发射第二个Observable的数据序列,看代码
Disposable disposable = Observable.fromCallable(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return null; //返回null,即出现错误}}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {//出现错误时开始发射新的Observable的数据序列return Observable.just(1, 2, 3);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {// 打印 1、2、3 随后会立即回调onComplete}});
复制代码
onExceptionResumeNext
onExceptionResumeNext
跟onErrorResumeNext
作用相同,都是在遇到错误时开始发射第二个Observable的数据序列,不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable,即它只能捕获Exception异常,这一点,我们可以在ObservableOnErrorNext$OnErrorNextObserver
类中源码看到
//使用onExceptionResumeNext操作符时,allowFatal为true
//使用onErrorResumeNext操作都是,allowFatal为false
if (allowFatal && !(t instanceof Exception)) {//非Exception异常,直接交给观察者的onError方法actual.onError(t); return;
}
复制代码
以上就是Catch操作符的介绍,处理原理无非就两种,第一种用一个默认的数据项替代错误,第二种在遇到错误时开始发射一个新的Observable的数据序列,Catch操作符就讲解到这,如需要知道具体业务场景,可以看这里HttpSender 介绍篇之多请求串行与并行(五)
Retry
Retry顾名思义就是在出现错误的时候进行重试,共有7个操作符,如下
retry() //无条件,重试无数次
retry(long times) //无条件,重试times次
retry(Predicate predicate) //根据条件,重试无数次
retryUntil(final BooleanSupplier stop) //根据条件,重试无数次
retry(long times, Predicate predicate) //根据条件,重试times次
retry(BiPredicate predicate) //功能与上一个一样,实现不同
retryWhen(final Function handler) //可以实现延迟重试n次
复制代码
前4个操作符内部都调用第五个retry(long times, Predicate predicate)
(需要注意的是retryUntil
操作符,只有接口里的方法返回false时,才会重试),所以我们直接从第五个开始
retry(long times, Predicate predicate)
Disposable disposable = Observable.fromCallable(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {//这里会执行n+1次,其中n为重试次数(如果重试条件为true的话)return null; //返回null,即出现错误}}).retry(3, new Predicate<Throwable>() {//重试3次@Overridepublic boolean test(Throwable throwable) throws Exception {//true 代表需要重试,可根据throwable对象返回是否需要重试return true;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {//重试成功,走这里}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {//重试n次后,依然出现错误,直接会走到这里}});
复制代码
上面注释很详情,这里不再讲解。
retry(BiPredicate predicate)
Disposable disposable = Observable.fromCallable(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {//这里会执行n+1次,其中n为重试次数(如果重试条件为true的话)return null; //返回null,即出现错误}}).retry(new BiPredicate<Integer, Throwable>() {@Overridepublic boolean test(Integer times, Throwable throwable) throws Exception {//times 为尝试次数,即第几次尝试return times <= 3; //只允许尝试3次}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {//重试成功,走这里}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {//重试n次后,依然出现错误,直接会走到这里}});
复制代码
retry(BiPredicate predicate)
跟retry(long times, Predicate predicate)
功能上是一样的,只是实现不一样而已。注释很详情,也不再讲解。
retryWhen(final Function handler)
先来看看官网的描述:retryWhen
将onError
中的Throwable
传递给一个函数,这个函数产生另一个Observable
,retryWhen
观察它的结果再决定是不是要重新订阅原始的Observable
。如果这个Observable
发射了一项数据,它就重新订阅,如果这个Observable
发射的是onError
通知,它就将这个通知传递给观察者然后终止。
这段话的大致意思就是,如果RxJava内部传过来的Observable
(retryWhen
方法传入的接口,通过接口方法传过来的)发射了一项数据,即发射onNext事件,就会重新订阅原始的Observable
,如果发射的是onError
事件,它就将这个事件传递给观察者然后终止。
那么,retryWhen
有什么作用呢,它的主要作用出现错误时,重新订阅,即重试,它跟之前的retry
操作符最大的区别就是,它可以延迟重试,例如,我们有这样一个需求,需要在遇到错误是,隔3秒重试一次,最多重试3次,先来看看代码
Disposable disposable = Observable.fromCallable(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {Log.d("LJX", "call");//这里会执行n+1次,其中n为重试次数return null; //返回null,即出现错误}}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> attempts) throws Exception {//注:这里需要根据RxJava传递过来的Observable对象发射事件,不能直接返回一个新的Observable,否则无效return attempts.flatMap(new Function<Throwable, ObservableSource<?>>() {private final int maxRetries = 3; //最多重试三次private final int retryDelayMillis = 3; //隔3秒重试一次private int retryCount; //当前重试次数@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {Log.d("LJX", "apply retryCount=" + retryCount);//每次遇到错误,这里都会回调一次if (++retryCount <= maxRetries) { //最多重试三次return Observable.timer(retryDelayMillis, TimeUnit.SECONDS);}return Observable.error(throwable); //第四次还错,就直接发射onError事件}});}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {//重试成功,走这里Log.d("LJX", "onNext ");}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {//重试n次后,依然出现错误,直接会走到这里Log.d("LJX", "onError");}});
复制代码
日志打印
2019-01-29 17:18:19.764 11179-11179/com.example.httpsender D/LJX: call
2019-01-29 17:18:19.764 11179-11179/com.example.httpsender D/LJX: apply retryCount=0
2019-01-29 17:18:22.771 11179-11229/com.example.httpsender D/LJX: call
2019-01-29 17:18:22.772 11179-11229/com.example.httpsender D/LJX: apply retryCount=1
2019-01-29 17:18:25.775 11179-11231/com.example.httpsender D/LJX: call
2019-01-29 17:18:25.775 11179-11231/com.example.httpsender D/LJX: apply retryCount=2
2019-01-29 17:18:28.779 11179-11242/com.example.httpsender D/LJX: call
2019-01-29 17:18:28.779 11179-11242/com.example.httpsender D/LJX: apply retryCount=3
2019-01-29 17:18:28.781 11179-11242/com.example.httpsender D/LJX: onError
复制代码
到这也许有读者会问,我可以不使用Observable.timer
操作符吗?可以的,这里可以使用Observable.intervalRange
操作符替代,可以根据自己的业务需求返回一个Observable
对象,例如使用Observable.just
操作符发送多个数据项,内部会进行过滤,只有发射的第一个数据项才有效。
好了catch
和retry
两大类错误处理操作符已介绍完毕,如有疑问,请留言,我会第一时间作答。
题外话
如果想在Activity/Fragment的生命周期对RxJava做自动管理,防止内存泄漏,可查看我的另一片文章。Android RxLife 一款轻量级别的RxJava生命周期管理库,感谢支持。