链式两个改装可观察物与RxJava

我想一个接一个地执行2个网络呼叫。 两个网络调用都返回Observable。 第二次呼叫使用来自第一次呼叫的成功结果的数据,第二次呼叫的成功结果中的方法使用来自第一次和第二次呼叫的成功结果的数据。 此外,我应该能够以不同的方式处理onError“事件”。 我怎样才能避免回调地狱,如下例所示:

API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1() { @Override public void call(final AuthResponse authResponse) { API().getUser(authResponse.getAccessToken()) .subscribe(new Action1<List>() { @Override public void call(List users) { doSomething(authResponse, users); } }, new Action1() { @Override public void call(Throwable throwable) { onErrorGetUser(); } }); } }, new Action1() { @Override public void call(Throwable throwable) { onErrorAuth(); } }); 

我知道zip,但我想避免创建“Combiner类”。

更新1.试图实现akarnokd的答案:

  API() .auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(authResponse -> API() .getUser(authResponse.getAccessToken()) .doOnError(throwable -> { getView().setError(processFail(throwable)); }), ((authResponse, users) -> { // Ensure returned user is the which was authenticated if (authResponse.getUserId().equals(users.get(0).getId())) { SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0)); getView().toNews(); } else { getView().setError(R.string.something_went_wrong); } })); 

但是在flatMap方法编译器中说它无法解析authResponse和users( authResponse.getAccessToken()users.get(0)等)的方法。 我是rx编程和lambdas的新手 – 请告诉我这是什么问题。 无论如何代码现在看起来更干净。

更新2。

 API() .auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> getView().setError(processFail(throwable))) .flatMap((AuthResponse authResponse) -> API() .getUser(authResponse.getAccessToken()) .doOnError(throwable -> getView().setError(processFail(throwable))), ((AuthResponse authResponse, List users) -> { // Ensure returned user is the which was authenticated if (authResponse.getUserId().equals(users.get(0).getId())) { SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0)); getView().toNews(); } return Observable.just(this); })); 

这样做了,但现在我的网络呼叫根本没有执行。

除了Anthony R.的答案之外,还有一个flatMap重载,它接受一个Func2并为你配对你的主要值和扁平值。 另外,查看onErrorXXX和onExceptionXXX运算符以进行错误操作,并将它们与第一个和第二个Observables链接起来

 first.onErrorReturn(1) .flatMap(v -> service(v).onErrorReturn(2), (a, b) -> a + b); 

你看过flatMap()吗? 如果您厌恶它(或zip())是需要创建一个不必要的类来保存两个对象,android.util.Pair可能是一个答案。 但是,我不确定如何准确地获得您正在寻找的error handling。

  API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1>>() { @Override public Observable> call(AuthResponse authResponse) { return API().getUser(authResponse.getAccessToken()); } }, new Func2, Pair>>() { @Override public Pair> call(AuthResponse authResponse, List users) { return new Pair<>(authResponse, users); } }).subscribe(new Action1>>() { @Override public void call(Pair> pair) { doSomething(pair.first, pair.second); } }, new Action1() { @Override public void call(Throwable throwable) { // not sure how to tell which one threw the error } });