将Firebase实时数据侦听器与RxJava相结合

我在我的应用程序中使用了Firebase ,以及RxJava。 只要后端数据发生变化(添加,删除,更改…),Firebase就能够通知您的应用。 我正在尝试将Firebase的function与RxJava结合起来。

我所听的数据被称为LeisureLeisureUpdate Observable发出包含Leisure和更新types(增加,移除,移动,改变)的LeisureUpdate

这是我的方法,它允许订阅这个事件。

 private Observable<LeisureUpdate> leisureUpdatesObservable; private ChildEventListener leisureUpdatesListener; private int leisureUpdatesSubscriptionsCount; @NonNull public Observable<LeisureUpdate> subscribeToLeisuresUpdates() { if (leisureUpdatesObservable == null) { leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() { @Override public void call(final Subscriber<? super LeisureUpdate> subscriber) { leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() { @Override public void onChildAdded(DataSnapshot dataSnapshot, String s) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED)); } @Override public void onChildChanged(DataSnapshot dataSnapshot, String s) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED)); } @Override public void onChildRemoved(DataSnapshot dataSnapshot) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED)); } @Override public void onChildMoved(DataSnapshot dataSnapshot, String s) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED)); } @Override public void onCancelled(FirebaseError firebaseError) { subscriber.onError(new Error(firebaseError.getMessage())); } }); } }); } leisureUpdatesSubscriptionsCount++; return leisureUpdatesObservable; } 

首先,我想使用Observable.fromCallable()方法来创buildObservable ,但是我猜这是不可能的,因为Firebase使用callback,对吗?

我保留Observable的一个实例,以便在多个Subscriber可以订阅的情况下始终有一个Observable

如果每个人都取消订阅,并且需要停止监听Firebase中的事件,则会出现问题。 我还没有find使Observable了解是否还有订阅。 所以我一直在计算多less次调用subscribeToLeisuresUpdates() ,与leisureUpdatesSubscriptionsCount

那么每次有人想退订都必须打电话

 @Override public void unsubscribeFromLeisuresUpdates() { if (leisureUpdatesObservable == null) { return; } leisureUpdatesSubscriptionsCount--; if (leisureUpdatesSubscriptionsCount == 0) { firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener); leisureUpdatesObservable = null; } } 

这是我发现让Observable在有用户的时候发射物品的唯一方法,但是我觉得必须有一个更简单的方法,特别是当没有更多的用户正在收听observable的时候。

任何人遇到类似的问题或有不同的方法?

Solutions Collecting From Web of "将Firebase实时数据侦听器与RxJava相结合"

另外,你可以使用我的rxFirebase库

您可以使用Observable.fromEmitter,沿着这些线

  return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() { @Override public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) { final ValueEventListener listener = new ValueEventListener() { @Override public void onDataChange(DataSnapshot dataSnapshot) { // process update LeisureUpdate leisureUpdate = ... leisureUpdateEmitter.onNext(leisureUpdate); } @Override public void onCancelled(DatabaseError databaseError) { leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage())); mDatabaseReference.removeEventListener(this); } }; mDatabaseReference.addValueEventListener(listener); leisureUpdateEmitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { mDatabaseReference.removeEventListener(listener); } }); } }, Emitter.BackpressureMode.BUFFER); 

把它放在你的Observable.create()中。

 subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { ref.removeEventListener(leisureUpdatesListener); } })); 

我build议你检查参考(或只是使用它)下一个库之一:

RxJava: https : //github.com/nmoskalenko/RxFirebase

RxJava 2.0: https : //github.com/FrangSierra/Rx2Firebase

其中一个使用RxJava,另一个使用RxJava 2.0的新RC。 如果你对它感兴趣,你可以在这里看到两者的区别。

您可以尝试使用此Rx方法来使用Firebase:RxFirebase https://gist.github.com/gsoltis/86210e3259dcc6998801

以下是使用RxJava2和Firebase的CompletionListener的示例代码:

 Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(final CompletableEmitter e) throws Exception { String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey(); FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order, new DatabaseReference.CompletionListener() { @Override public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) { if (e.isDisposed()) { return; } if (databaseError == null) { e.onComplete(); } else { e.onError(new Throwable(databaseError.getMessage())); } } }); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());