Articles of rx java

Observable.empty()导致java.util.NoSuchElementException:Sequence不包含任何元素

我正在使用Retrofit 2.0.0-beta2和RxJava 1.0.14。 我以这种方式处理错误,因为我需要在doFinally中执行一些代码: .onErrorResumeNext(Observable.empty()); 但是当我得到带有错误的http响应时(例如401),我的应用程序崩溃而堆栈跟踪中没有我的类。 如果使用Observable.never没有任何不好的事情发生。 这是完整的堆栈跟踪: java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. at rx.internal.schedulers.ScheduledAction.run (ScheduledAction.java:60) at android.os.Handler.handleCallback (Handler.java:739) at android.os.Handler.dispatchMessage (Handler.java:95) at android.os.Looper.loop (Looper.java:135) at android.app.ActivityThread.main (ActivityThread.java:5221) at java.lang.reflect.Method.invoke (Unknown source) at java.lang.reflect.Method.invoke (Method.java:372) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run (ZygoteInit.java:899) at com.android.internal.os.ZygoteInit.main (ZygoteInit.java:694) rx.exceptions.OnErrorNotImplementedException: Sequence contains no elements at rx.Observable$27.onError (Observable.java:7535) at rx.observers.SafeSubscriber._onError […]

在文本更改事件之前自动调用RxAndroid textview事件

我在编辑文本搜索中使用rxandroid进行去抖操作 我用了 private void setUpText() { _mSubscription = RxTextView.textChangeEvents(searchStation)// .debounce(500, TimeUnit.MILLISECONDS)// default Scheduler is Computation .observeOn(AndroidSchedulers.mainThread())// .subscribe(getOps().getdata()); } 和观察者一样 public Observer getdata() { return new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(TextViewTextChangeEvent onTextChangeEvent) { // stationsugession(onTextChangeEvent.text().toString()); //here i called link to get […]

改造默认线程

我在我的Android应用程序和我的代码中使用Retrofit和RxJava : public void getConfig(NetworkSubscriber subscriber) { Observable observable = mApi.getConfig(); observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); } public void getCode(String mobile, int type, NetworkSubscriber subscriber) { Observable observable = mApi.getCode(mobile, type); observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); } 我不想写.subscribeOn(Schedulers.newThread())和.observeOn(AndroidSchedulers.mainThread())每个业务方法 我能怎么做?

Android rxJava使用改造进行error handling

我正在使用较新的RX java而不是 Observable.create(new Observable.OnSubscribeFunc() {…}); 使用:(由于弃用) Observable.create(new Observable.OnSubscribe() {…}); (这可能很重要,因为大多数示例,教程,explonation使用旧的……) 好吧,让我们看看我的问题。 我有一个Java类,它的相关部分: private interface ApiManagerService { @FormUrlEncoded @POST(“/login”) User getUser(@Field(“username”) String userName, @Field(“password”) String password); } private static RestAdapter restAdapter = new RestAdapter.Builder() .setEndpoint(HOST) .setLogLevel(RestAdapter.LogLevel.FULL) .build(); private static ApiManagerService apiManager = restAdapter.create(ApiManagerService.class); public static Subscription login(final String userName, final String password, Observer observer) { return […]

使用SqlBrite / SqlDelight(脱机数据库)和Retrofit(Http请求)的存储库模式

我正在使用SqlBrite / SqlDelight在RxJava中实现存储库模式以进行离线数据存储并为Http请求进行改造 这是一个样本: protected Observable<List> getItemsFromDb() { return database.createQuery(tableName(), selectAllStatement()) .mapToList(cursor -> selectAllMapper().map(cursor)); } public Observable<List>getItems(){ Observable<List> server = getRequest() .doOnNext(items -> { BriteDatabase.Transaction transaction = database.newTransaction(); for (Item item : items){ database.insert(tableName(), contentValues(item)); } transaction.markSuccessful(); transaction.end(); }) .flatMap(items -> getItemsFromDbById()) .delaySubscription(200, TimeUnit.MILLISECONDS); Observable<List> db = getItemsFromDbById(id) .filter(items -> items != null && items.size() […]

CursorLoader的onLoadFinished回调中的RxJava2

要从数据库中获取数据,我在应用程序中使用CursorLoader 。 一旦onLoadFinished()回调方法调用app的逻辑,就将Cursor对象转换为业务模型要求中的对象List 。 如果有大量数据,那么转换(繁重操作)需要一些时间。 这会降低UI线程的速度。 我尝试使用传递Cursor对象的RxJava2在非UI Thread开始转换,但是得到了Exception : Caused by: android.database.StaleDataException: Attempting to access a closed CursorWindow.Most probable cause: cursor is deactivated prior to calling this method. 这是Fragment代码的一部分: @Override public Loader onCreateLoader(int id, Bundle args) { QueryBuilder builder; switch (id) { case Constants.FIELDS_QUERY_TOKEN: builder = QueryBuilderFacade.getFieldsQB(activity); return new QueryCursorLoader(activity, builder); default: return null; } […]

如何使用RxJava处理回收站视图的项目点击

我有兴趣了解回应物品点击回收者视图的最佳方法是什么。 通常我会向ViewHolder添加一个onclick()侦听器,并通过接口将结果传回活动/片段。 我想在onBindViewHolder中添加一个Observable,但我不想为每个项目绑定创建一个新的Observable。

处理RxJava中的APIexception

我目前正试图绕过RxJava,但我在处理服务调用exception方面遇到了一些麻烦。 基本上,我有一个(Retrofit)服务返回一个Observable 。 ServiceResponse的定义如下: public class ServiceResponse { private int status; private String message; private JsonElement data; public JsonElement getData() { return data; } public int getStatus() { return status; } public String getMessage() { return message; } } 现在我想要的是将该generics响应映射到数据JsonElement字段中包含的List (我假设你不关心Account对象的样子,所以我不会用它来污染post)。 以下代码适用于成功案例,但我找不到处理APIexception的好方法: service.getAccounts() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(new Func1() { @Override public AccountData call(ServiceResponse serviceResponse) { // […]

将Firebase实时数据侦听器与RxJava结合使用

我在我的应用程序中使用Firebase以及RxJava。 Firebase能够在后端数据发生变化(添加,删除,更改……)时通知您的应用。 我试图将Firebase的function与RxJava结合起来。 我正在侦听的数据称为Leisure , Observable发出LeisureUpdate ,其中包含Leisure和更新types(添加,删除,移动,更改)。 这是我的方法,允许订阅此事件。 private Observable leisureUpdatesObservable; private ChildEventListener leisureUpdatesListener; private int leisureUpdatesSubscriptionsCount; @NonNull public Observable subscribeToLeisuresUpdates() { if (leisureUpdatesObservable == null) { leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber subscriber) { leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() { @Override public void onChildAdded(DataSnapshot dataSnapshot, String s) { final Leisure leisure […]

用RxJava替换EventBus – N个用户总是在听

我正在Android应用程序中用RxJava替换EventBus模式。 我有事件提醒任何感兴趣的各方我的缓存单例中的数据更新。 每次调用Web服务时,都会更新数据,并通过发布的事件提醒订阅者。 我在RxJava中使用AsyncSubject接近这个设置。 观察者从主题中获得单个事件,但随后他们获得onComplete事件并取消订阅。 这首先加载UI,但是当需要刷新数据时,不会通知订阅者。 如何告诉这些Subscribers者继续收听来自Subject更多事件? 我需要一个会报告最新项目的Subject 。 PublishSubject仅在订阅后发出项目,因此它不能满足我的需求。 我的订阅者在不同的时间(可能在第一个数据事件之后)开始观察,因此我需要主题发出最后观察的项目,然后保持流为后续项目打开。 似乎AsyncSubject和PublishSubject的组合就是我所需要的。 有没有办法用内置的类来实现这一点,还是我需要创建自己的主题? WebServiceObservable OR CacheObservable ^ | AsyncSubject ^ | / \ / \ / \ UiObserver1 UiObserver2