onNext和onComplete的行为

我有一个Observable做一些事情,而不需要发出一个值。 另外我还有一个Observable可以使用的对象列表。 所以对于这个列表中的所有元素:doSomething()

Observable.from(uris) .flatMap(new Func1<Uri, Observable<Void>>() { @Override public Observable<Void> call(Uri uri) { return createDoSomethingObservable(uri); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(new Observer<Void>() { @Override public void onCompleted() { Log.d(TAG, "completed"); } @Override public void onError(Throwable e) { } @Override public void onNext(Void aVoid) { Log.d(TAG, "next"); } }); 

而创buildObservable的方法:

 Observable<Void> createDoSomethingObservable(final Uri uri) { return Observable.create(new Observable.OnSubscribe<Void>() { @Override public void call(Subscriber<? super Void> subscriber) { //doSomething subscriber.onNext(null); subscriber.onCompleted(); } }); } 

现在,当我用3个元素的列表运行这个时,我得到:

 next next next completed 

这是好的,因为这是我想要的,但我不知道为什么它的工作。 首先,我开始只需要完成,因为最后观察者完成了它的工作并完成了。 但是当然onNext是从来没有呼吁用户。 反过来也是如此。

所以我的问题是:

  1. 为什么onComplete只调用最后一个列表元素?
  2. 有没有更好的方法来解决这个问题?

Solutions Collecting From Web of "onNext和onComplete的行为"

onComplete被称为最后一个元素,因为这是链中最早的可观察元素( from(uris) )完成的时候。

预计你从flatMap发射的flatMap将会调用onComplete 。 一旦完成(并且call已经返回),则来自下一个发射的可以被处理。 一旦from完成发射可观测量,它调用onComplete并链完成,有效。

我认为,小代码可以帮助你理解onNext( )onComplete()
假设你有一个List<Uri> 。 我们手动将其转换为Observable<Uri>

 public static Observable<Uri> getUries(List<Uri> uriList){ return Observable.create(new Observable.OnSubscribe<Uri>() { @Override public void call(Subscriber<? super Uri> subscriber) { for(Uri uri : uriList){ subscriber.onNext(uri); } subscriber.onCompleted(); } }); } 

或者使用Lambdaexpression式:

 public static Observable<Uri> getUries(List<Uri> uriList){ return Observable.create(subscriber -> { for(Uri uri : uriList){ subscriber.onNext(uri); } subscriber.onCompleted(); }); } 

正如你所看到的,我们迭代input列表,并且为每个元素调用onNext( ) ,当我们完成将List转换为Observable ,我们调用了onComplete()

PS这个代码只是一个演示,请不要用它来转换ListObservable 。 使用运算符Observable.from()

更新

from( )实现中的运算符:

 ... while (true) { if (o.isUnsubscribed()) { return; } else if (it.hasNext()) { o.onNext(it.next()); } else if (!o.isUnsubscribed()) { o.onCompleted(); return; } else { // is unsubscribed return; } } ... 

链接: https : //github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L75-L87

onComplete (与onError相同)在可观察链中只调用一次,这是rxjava实现的方式

我认为你的方法是正确的,所以不需要更好的方法。