如何在返回订阅服务器之前拦截可观察对象并在RxJava中修改它?

我正在尝试命中一个服务并返回一个对象列表,在它返回给订阅者之前,我想对列表中的每个对象进行另一个同步调用,以进行另一个服务调用来设置一个丢失的字段。 我已成功完成所有调用,但订阅者返回的对象具有此字段,我需要将其设置为null。 以下是我的代码示例:

示例服务:

rx.Observable<List> getExampleObject(); rx.Observable getMissingObjectByFoo(@Path("foo") String foo); 

示例类:

 public class ExampleObject { String foo; MissingObject bar; public String getFoo() { return this.foo; } public void setFoo(String value) { this.foo = value; } public MissingObject getBar() { return this.bar; } public void setBar(MissingObject value) { this.bar = value; } } 

示例实施:

 mService.getExampleObject().flatMap(new Func1<List, Observable>() { @Override public Observable<List> call(List exampleObjects) { for (ExampleObject entry : exampleObjects) { String foo = entry.getFoo(); mService.getMissingObjectByFoo(foo) .subscribeOn(mScheduler.backgroundThread()) .observeOn(mScheduler.mainThread()) .subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MissingObject missingObject) { entry.setBar(missingObject); } }); } return Observable.just(exampleObjects); }; 

您正在寻找zip运算符,如下所述: Zip运算符 。 我想你想把你所有电话的拉链平面映射到一个拉链,所以,像这样:

  mService.getExampleObject().flatMap(new Func1, Observable>() { @Override public Observable> call(List exampleObjects) { List> allTheObservables = new ArrayList>(); for (ExampleObject entry : exampleObjects) { allTheObservables.add(mService.getMissingObjectByFoo(foo).map(new Func1() { @Override public ExampleObject call(MissingObject missingObject) { return entry.setBar(missingObject); } })); } return Observable.zip(allTheObservables, new FuncN() { @Override public ExampleObject call(ExampleObject... args) { return Arrays.asList(args); } }); } }); 

如果不起作用,或者存在语法问题,这是一个具体的例子,使用github api:

  service.getContributorsObservable("square", "dagger") .flatMap(new Func1, Observable>>() { @Override public Observable> call(List contributors) { List> allTheObservables = new ArrayList<>(contributors.size()); for (final Contributor contributor : contributors) { allTheObservables.add(service.getContributorsObservable(contributor.login).map(new Func1() { @Override public String call(User user) { return contributor.login + " is " + user.name; } })); } return Observable.zip(allTheObservables, new FuncN>() { @Override public List call(Object... args) { return Arrays.asList((String[]) args); } }); } }); 

请记住,这将进行n + 1个网络调用,1表示ExampleObject列表,然后是该列表中的每个ExampleObject 1个。 如果可能的话,我强烈建议您与API的维护者交谈,以便在API端获取信息查找。 只知道这将使用一些带宽!

因为您更新条目的中间调用是异步的,我认为您不能坚持使用List ,而应该直接从Observable操作ExampleObject

 mService.getExampleObject() // Spread the list .flatMap(list -> Observable.from(list)) // Update your object // Here we zip the object with the missing object, // so that when the missing object is obtained, // we update the entry and emit it. .flatMap(entry -> Observable.zip( Observable.just(entry), mDocsService.getMissingObjectByFoo(entry.getFoo()), (entry, missingObject) -> { entry.setBar(missingObject); return entry; }) ) // if you really want a map after all .toList(); 

边注:

如果您可以在map中使用函数取决于外部variables(条目),则可以跳过zip。 这是我试图避免的,但无论如何它在这里:

  .flatMap(entry -> mDocsService.getMissingObjectByFoo(entry.getFoo()) .map(missingObject -> { entry.setBar(missingObject); return entry; }) )