用全局的rxjava处理networkingexception进行翻新

我试图在全球范围内处理应用程序中的exception,以便翻新引发一个错误,我在一些特定的类中使用逻辑来处理这些错误。

我有一个接口

@POST("/token") AuthToken refreshToken(@Field("grant_type") String grantType, @Field("refresh_token") String refreshToken); 

和观察

 /** * Refreshes auth token * * @param refreshToken * @return */ public Observable<AuthToken> refreshToken(String refreshToken) { return Observable.create((Subscriber<? super AuthToken> subscriber) -> { try { subscriber.onNext(apiManager.refreshToken(REFRESH_TOKEN, refreshToken)); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } }).subscribeOn(Schedulers.io()); } 

当我从服务器(无效令牌或一些其他networking相关的错误)401得到我想刷新令牌,并重复其余的电话。 有没有办法做到这一点与rxjava所有rest调用与某种观察将会捕捉到这个错误的全球性,处理它,并重复抛出它的调用呢?

现在我正在使用主题赶上.subscribe()这样的错误

 private static BehaviorSubject errorEvent = BehaviorSubject.create(); public static BehaviorSubject<RetrofitError> getErrorEvent() { return errorEvent; } 

并在一些电话

 getCurrentUser = userApi.getCurrentUser().observeOn(AndroidSchedulers.mainThread()) .subscribe( (user) -> { this.user = user; }, errorEvent::onNext ); 

那么在我的主要活动中,我订阅该行为主题并parsing错误

 SomeApi.getErrorEvent().subscribe( (e) -> { //parse the error } ); 

但我不能重复调用抛出错误的observable。

Solutions Collecting From Web of "用全局的rxjava处理networkingexception进行翻新"

你需要使用运算符onErrorResumeNext(Func1 resumeFunction) ,在官方wiki中有更好的解释:

onErrorResumeNext()方法返回一个Observable,它反映了源Observable的行为,除非Observable在这种情况下调用onError(),而不是将该错误传播给订阅者,onErrorResumeNext()将开始镜像第二个备份Observable

在你的情况下,我会把这样的东西:

 getCurrentUser = userApi.getCurrentUser() .onErrorResumeNext(refreshTokenAndRetry(userApi.getCurrentUser())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(...) 

哪里:

  private <T> Func1<Throwable,? extends Observable<? extends T>> refreshTokenAndRetry(final Observable<T> toBeResumed) { return new Func1<Throwable, Observable<? extends T>>() { @Override public Observable<? extends T> call(Throwable throwable) { // Here check if the error thrown really is a 401 if (isHttp401Error(throwable)) { return refreshToken().flatMap(new Func1<AuthToken, Observable<? extends T>>() { @Override public Observable<? extends T> call(AuthToken token) { return toBeResumed; } }); } // re-throw this error because it's not recoverable from here return Observable.error(throwable); } }; } 

还要注意的是,这个函数在其他情况下可以很容易地使用,因为它没有被恢复的Observable发出的实际值input。

 @Override public Observable<List<MessageEntity>> messages(String accountId, int messageType) { return mMessageService.getLikeMessages(messageType) .onErrorResumeNext(mTokenTrick. refreshTokenAndRetry(mMessageService.getLikeMessages(messageType))); }