RxJava:找出BehaviorSubject是否是重复的值

我正在制作一个Android界面,显示从networking中获取的一些数据。 我想让它显示最新的可用数据,并永远不会是空的(除非根本没有数据被提取),所以我使用BehaviorSubject为订户(我的用户界面)提供最新的可用信息,同时刷新它更新它的背景。

这是有效的,但由于我的用户界面的另一个要求,我现在必须知道发布的结果是否从networking获得新鲜或不。 (换句话说,我需要知道发布的结果是否为BehaviorSubject的保存项目。)

我怎样才能做到这一点? 如果我需要将它分解为多个Observables,那就好,只要我能够获得BehaviorSubject的caching行为(获取最后的可用结果),同时也能够判断返回的结果是来自caching还是不。 我认为这样做的一个拙劣的方法是检查响应的时间戳是否相对较快,但是这实在是太渺茫了,我宁愿想办法用RxJava来完成。

Solutions Collecting From Web of "RxJava:找出BehaviorSubject是否是重复的值"

正如你在问题中提到的,这可以用多个Observable来完成。 实质上,你有两个可观察物:“ 可以观察到新鲜的反应 ”和“ 可以观察到caching的反应 ”。 如果有什么东西可以“观察”,你可以把它表示成一个可观察的东西。 我们来命名第一个original ,第二个replayed

看到这个JSBin (JavaScript,但概念可以直接翻译成Java。据我所知,没有一个JavaBin用于这些目的)。

 var original = Rx.Observable.interval(1000) .map(function (x) { return {value: x, from: 'original'}; }) .take(4) .publish().refCount(); var replayed = original .map(function (x) { return {value: x.value, from: 'replayed'}; }) .replay(null, 1).refCount(); var merged = Rx.Observable.merge(original, replayed) .replay(null, 1).refCount() .distinctUntilChanged(function (obj) { return obj.value; }); console.log('subscribe 1st'); merged.subscribe(function (x) { console.log('subscriber1: value ' + x.value + ', from: ' + x.from); }); setTimeout(function () { console.log(' subscribe 2nd'); merged.subscribe(function (x) { console.log(' subscriber2: value ' + x.value + ', from: ' + x.from); }); }, 2500); 

这里的总体思路是:用一个字段注释事件from指明它的起源。 如果是original ,这是一个新的回应。 如果replayed ,则是一个caching响应。 可观察original只会发出from: 'original'replayed观察将只会发射from: 'replayed' 。 在Java中,我们需要更多样板,因为您需要创build一个类来表示这些注释事件。 否则RxJS中的相同运算符可以在RxJava中find。

最初的Observable是publish().refCount()因为我们只想要这个stream的一个实例,与所有观察者共享。 实际上,在RxJS和Rx.NET中, share()publish().refCount()的别名。

重放的Observable replay(1).refCount()因为它也是像原来一样共享,但是replay(1)给了我们caching的行为。

merged Observable包含原始的和重播的,这就是你应该公开给所有订户的东西。 由于replayed会立即发出,每当original ,我们使用distinctUntilChanged事件的值忽略即时连续。 我们replay(1).refCount() 也是合并的原因是因为我们希望原始和重放的合并也是在所有观察者之间共享的stream的单个共享实例。 我们可以使用publish().refCount()来达到这个目的,但是我们不能失去replayed包含的重放效果,因此replay(1).refCount() ,而不是publish().refCount()

不分明涵盖你的情况? BehaviorSubject只在订阅后重复最新的元素。

我相信你想要的是这样的:

 private final BehaviorSubject<T> fetched = BehaviorSubject.create(); private final Observable<FirstTime<T>> _fetched = fetched.lift(new Observable.Operator<FirstTime<T>, T>() { private AtomicReference<T> last = new AtomicReference<>(); @Override public Subscriber<? super T> call(Subscriber<? super FirstTime<T>> child) { return new Subscriber<T>(child) { @Override public void onCompleted() { child.onCompleted(); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onNext(T t) { if (!Objects.equals(t, last.getAndSet(t))) { child.onNext(FirstTime.yes(t)); } else { child.onNext(FirstTime.no(t)); } } }; } }); public Observable<FirstTime<T>> getObservable() { return _fetched; } public static class FirstTime<T> { final boolean isItTheFirstTime; final T value; public FirstTime(boolean isItTheFirstTime, T value) { this.isItTheFirstTime = isItTheFirstTime; this.value = value; } public boolean isItTheFirstTime() { return isItTheFirstTime; } public T getValue() { return value; } public static <T> FirstTime<T> yes(T value) { return new FirstTime<>(true, value); } public static <T> FirstTime<T> no(T value) { return new FirstTime<>(false, value); } } 

包装类FirstTime有一个布尔值,可以用来查看Observable的任何订户是否曾经见过它。

希望有所帮助。

将BehaviorSubject对象的信息存储在数据结构中,并进行查找,例如Dictionnary。 每个值将是一个关键,值将是迭代次数。

因此,当你看一个特定的关键字时,如果你的词典已经包含它,并且它的价值已经在一个,那么你知道一个值是一个重复的值。

我不确定你想要达到什么目的。 也许你只是想有一个“最新”数据的智能来源,第二个来源告诉你什么时候刷新数据?

  BehaviorSubject<Integer> dataSubject = BehaviorSubject.create(42); // initial value, "never empty" Observable<String> refreshedIndicator = dataSubject.map(data -> "Refreshed!"); refreshedIndicator.subscribe(System.out::println); Observable<Integer> latestActualData = dataSubject.distinctUntilChanged(); latestActualData.subscribe( data -> System.out.println( "Got new data: " + data)); // simulation of background activity: Observable.interval(1, TimeUnit.SECONDS) .limit(100) .toBlocking() .subscribe(aLong -> dataSubject.onNext(ThreadLocalRandom.current().nextInt(2))); 

输出:

 Refreshed! Got new data: 42 Refreshed! Got new data: 0 Refreshed! Refreshed! Refreshed! Got new data: 1 Refreshed! Got new data: 0 Refreshed! Got new data: 1