具有多个订户的单个可观察者

我有一个Observable<<List> getFoo() ,它是从Retrofit Service创建的,在调用.getFoo()方法之后,我需要与Multiple Subscribers共享它。 但是,调用.share()方法会导致重新执行网络调用。 重播运算符也不起作用。 我知道可能的解决方案可能是.cache() ,但我不知道为什么会导致这种行为。

 // Create an instance of our GitHub API interface. Retrofit retrofit = new Retrofit.Builder() .baseUrl(API_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); // Create a call instance for looking up Retrofit contributors. Observable<List> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors + " -> 2"); } }); subscription1.unsubscribe(); subscription2.unsubscribe(); 

上面的代码可以重现上述行为。 您可以对其进行调试,并查看收到的列表属于不同的MemoryAddress。

我还将ConnectableObservables看作是一个潜在的解决方案,但是这需要我携带原始的observable,每次我想添加一个新的Subscriber时调用.connect()

使用.share()这种行为在Retrofit 1.9之前工作正常。 它停止了Retrofit 2 – beta的工作。 我还没有使用几小时前发布的Retrofit 2 Release Version进行测试。

编辑:01/02/2017

对于未来的读者,我在这里写了一篇文章,详细解释了这个案例!

您似乎(隐式)将.share()返回的ConnectedObservable .share()回普通的Observable 。 您可能想要了解热和冷可观测量之间的区别。

尝试

 ConnectedObservable> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List contributors) { System.out.println(contributors + " -> 2"); } }); testObservable.connect(); subscription1.unsubscribe(); subscription2.unsubscribe(); 

编辑:您不需要每次想要新订阅时调用connect() ,只需要它来启动observable。 我想你可以使用replay()来确保所有后续订阅者都获得所有产生的项目

 ConnectedObservable> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share() .replay() 

在与RxJava开发人员DávidKarnok核实后,我想提出一个完整的解释,说明这里发生了什么。

share()被定义为publish().refCount() ,即源Observable首先通过publish()转换为ConnectableObservable ,而不是必须“手动”调用connect() ,该部分由refCount()处理。 特别是, refCount将在ConnectableObservable本身收到第一个订阅时调用connect() ; 那么,只要有至少一个用户,它就会保留订阅; 最后,当订阅者数量下降到0时,它将取消订阅。 使用 Observables ,就像Retrofit返回的那样,这将停止任何正在运行的计算。

如果在其中一个用户出现其中一个周期后, refCount将再次调用connect ,从而触发对源Observable的新订阅。 在这种情况下,它将触发另一个网络请求。

现在,这通常不会在Retrofit 1(实际上是此提交之前的任何版本)中变得明显,因为默认情况下这些旧版本的Retrofit会将所有网络请求移动到另一个线程。 这通常意味着所有的subscribe()调用都会在第一个请求/ Observable仍在运行时发生,因此新的Subscriber者只会被添加到refCount ,因此不会触发其他请求/ Observables

但是,较新版本的Retrofit不会默认将工作移动到另一个线程 – 您必须通过调用例如subscribeOn(Schedulers.io())显式地执行此操作。 如果你不这样做,一切都将保留在当前线程上,这意味着第二个subscribe()只会在第一个Observable调用onCompleted之后发生,因此在所有Subscribers都取消订阅并且一切都关闭之后。 现在,正如我们在第一段中看到的那样,当第二个subscribe()被调用时, share()别无选择,只能引起对源Observable的另一个Subscription并触发另一个网络请求。

因此,要回到您在Retrofit 1中习惯的行为,只需添加subscribeOn(Schedulers.io())

这应该导致只执行网络请求 – 大多数时候。 原则上,您仍然可以获得多个请求(并且您可以使用Retrofit 1),但是只有当您的网络请求非常快且/或者subscribe()调用发生相当大的延迟时,才会再次获得第一个请求第二个subscribe()发生时请求完成。

因此,Dávid建议使用cache() (但它有你提到的缺点)或replay().autoConnect() 。 根据这些发行说明 , autoConnect工作方式与refCount的前半部分相同,或者更确切地说,就像是

与refCount()类似的行为,除了订阅者丢失时它不会断开连接。

这意味着只有在第一个subscribe()发生时才会触发请求,但是后来所有Subscriber都将接收所有发出的项目,无论是否在0个订阅者之间的任何时间都有。