处理RxJava中的APIexception

我目前正试图绕过RxJava,但我在处理服务调用exception方面遇到了一些麻烦。

基本上,我有一个(Retrofit)服务返回一个ObservableServiceResponse的定义如下:

 public class ServiceResponse { private int status; private String message; private JsonElement data; public JsonElement getData() { return data; } public int getStatus() { return status; } public String getMessage() { return message; } } 

现在我想要的是将该generics响应映射到数据JsonElement字段中包含的List (我假设你不关心Account对象的样子,所以我不会用它来污染post)。 以下代码适用于成功案例,但我找不到处理APIexception的好方法:

 service.getAccounts() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(new Func1() { @Override public AccountData call(ServiceResponse serviceResponse) { // TODO: ick. fix this. there must be a better way... ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); switch (responseType) { case SUCCESS: Gson gson = new GsonBuilder().create(); return gson.fromJson(serviceResponse.getData(), AccountData.class); case HOST_UNAVAILABLE: throw new HostUnavailableException(serviceResponse.getMessage()); case SUSPENDED_USER: throw new SuspendedUserException(serviceResponse.getMessage()); case SYSTEM_ERROR: case UNKNOWN: default: throw new SystemErrorException(serviceResponse.getMessage()); } } }) .map(new Func1<AccountData, List>() { @Override public List call(AccountData accountData) { Gson gson = new GsonBuilder().create(); List res = new ArrayList(); for (JsonElement account : accountData.getAccounts()) { res.add(gson.fromJson(account, Account.class)); } return res; } }) .subscribe(accountsRequest); 

有一个更好的方法吗? 这确实有效,onError会激发给我的观察者,我会收到我扔的错误,但看起来我似乎并没有这样做。

提前致谢!

编辑:

让我澄清一下我想要实现的目标:

我希望有一个可以从UI调用的类(例如,Activity或Fragment,或其他)。 该类将Observer<List>作为参数,如下所示:

 public Subscription loadAccounts(Observer<List> observer, boolean forceRefresh) { ... } 

当UI被分离/销毁/等时,该方法将返回可以取消订阅的订阅。

参数化观察者将处理onNext,以便在Accounts列表中传递成功的响应。 OnError会处理任何exception,但也会传递任何APIexception(例如,如果响应状态!= 200,我们将创建一个Throwable并将其传递给onError)。 理想情况下,我不想只是“抛出”exception,我想将它直接传递给Observer。 这就是我看到的所有例子。

复杂的是我的Retrofit服务返回一个ServiceResponse对象,所以我的观察者不能订阅它。 我提出的最好的方法是在Observer周围创建一个Observer包装器,如下所示:

 @Singleton public class AccountsDatabase { private AccountsService service; private List accountsCache = null; private PublishSubject accountsRequest = null; @Inject public AccountsDatabase(AccountsService service) { this.service = service; } public Subscription loadAccounts(Observer<List> observer, boolean forceRefresh) { ObserverWrapper observerWrapper = new ObserverWrapper(observer); if (accountsCache != null) { // We have a cached value. Emit it immediately. observer.onNext(accountsCache); } if (accountsRequest != null) { // There's an in-flight network request for this section already. Join it. return accountsRequest.subscribe(observerWrapper); } if (accountsCache != null && !forceRefresh) { // We had a cached value and don't want to force a refresh on the data. Just // return an empty subscription observer.onCompleted(); return Subscriptions.empty(); } accountsRequest = PublishSubject.create(); accountsRequest.subscribe(new ObserverWrapper(new EndObserver<List>() { @Override public void onNext(List accounts) { accountsCache = accounts; } @Override public void onEnd() { accountsRequest = null; } })); Subscription subscription = accountsRequest.subscribe(observerWrapper); service.getAccounts() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(accountsRequest); return subscription; } static class ObserverWrapper implements Observer { private Observer<List> observer; public ObserverWrapper(Observer<List> observer) { this.observer = observer; } @Override public void onCompleted() { observer.onCompleted(); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onNext(ServiceResponse serviceResponse) { ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); switch (responseType) { case SUCCESS: Gson gson = new GsonBuilder().create(); AccountData accountData = gson.fromJson(serviceResponse.getData(), AccountData.class); List res = new ArrayList(); for (JsonElement account : accountData.getAccounts()) { res.add(gson.fromJson(account, Account.class)); } observer.onNext(res); observer.onCompleted(); break; default: observer.onError(new ApiException(serviceResponse.getMessage(), responseType)); break; } } } } 

我仍然觉得我没有正确使用这个。 我以前没有见过其他人使用ObserverWrapper。 也许我不应该使用RxJava,尽管SoundCloud和Netflix的人员在他们的演示文稿中真的把它卖给了我,我非常渴望学习它。

请阅读下面我添加了一个编辑。

使用RxJava在Action / Func / Observer中抛出是完全正确的。 该exception将由框架传播到您的Observer。 如果你只限于调用onError,那么你就会扭曲自己来实现这一点。

据说这是一个建议,就是简单地删除这个包装器,并在Service.getAccount …的Observables链中添加一个简单的validationAction。

我将使用与地图链接的doOnNext(new ValidateServiceResponseOrThrow)(新的MapValidResponseToAccountList)。 这些是简单的类,它们实现了必要的代码,以使Observable链更具可读性。

这是使用我建议简化的loadAccount方法。

 public Subscription loadAccounts(Observer> observer, boolean forceRefresh) { if (accountsCache != null) { // We have a cached value. Emit it immediately. observer.onNext(accountsCache); } if (accountsRequest != null) { // There's an in-flight network request for this section already. Join it. return accountsRequest.subscribe(observer); } if (accountsCache != null && !forceRefresh) { // We had a cached value and don't want to force a refresh on the data. Just // return an empty subscription observer.onCompleted(); return Subscriptions.empty(); } accountsRequest = PublishSubject.create(); accountsRequest.subscribe(new EndObserver>() { @Override public void onNext(List accounts) { accountsCache = accounts; } @Override public void onEnd() { accountsRequest = null; } }); Subscription subscription = accountsRequest.subscribe(observer); service.getAccounts() .doOnNext(new ValidateServiceResponseOrThrow()) .map(new MapValidResponseToAccountList()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(accountsRequest); return subscription; } private static class ValidateResponseOrThrow implements Action1 { @Override public void call(ServiceResponse response) { ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); if (responseType != SUCCESS) throw new ApiException(serviceResponse.getMessage(), responseType)); } } private static class MapValidResponseToAccountList implements Func1> { @Override public Message call(ServiceResponse response) { // add code here to map the ServiceResponse into the List as you've provided already } } 

编辑:除非有人说不,否则我认为使用flatMap返回错误是最佳做法。 我过去曾抛出过Exceptions from Action,但我不相信这是推荐的方式。

如果使用flatMap,则会有一个更清晰的Exception堆栈。 如果从Action内部抛出,Exception堆栈实际上将包含rx.exceptions.OnErrorThrowable$OnNextValueexception,这是不理想的。

让我用flatMap演示上面的例子。

 private static class ValidateServiceResponse implements rx.functions.Func1> { @Override public Observable call(ServiceResponse response) { ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); if (responseType != SUCCESS) return Observable.error(new ApiException(serviceResponse.getMessage(), responseType)); return Observable.just(response); } } service.getAccounts() .flatMap(new ValidateServiceResponse()) .map(new MapValidResponseToAccountList()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(accountsRequest); 

正如你所看到的,差异是微妙的。 ValidateServiceResponse现在实现Func1而不是Action1 ,我们不再使用throw关键字。 我们使用Observable.error(new Throwable)代替。 我相信这与预期的Rx合约更合适。

你可以阅读这篇关于error handling的好文章http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/