Articles of rx java

在项目中find缺少的onError

我试图find项目中缺少onError() 。 这意味着应用程序崩溃,因为订阅不处理throwables所以我想find该订阅并添加onError方法。 不幸的是,堆栈跟踪在这里并没有真正的帮助,它只显示了throw new IOException的行,但仅此而已: FATAL EXCEPTION: main Process: my.app.example.dev, PID: 20309 java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread. at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:54) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:135) at android.app.ActivityThread.main(ActivityThread.java:5221) at java.lang.reflect.Method.invoke(Native Method) at java.lang.reflect.Method.invoke(Method.java:372) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:899) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:694) Caused by: rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:201) at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111) […]

通过改造和rxjava 2.x处理空响应

当使用rxjava 1.xi用于返回Observable来处理来自改造的空响应时: @POST( “login” ) Observable getToken( @Header( “Authorization” ) String authorization, @Header( “username” ) String username, @Header( “password” ) String password ); 但是因为rxjava 2.x不会用Void发出任何东西,所以有任何好的做法来处理那些空的响应吗?

使用Espresso和RxJava测试无尽的滚动RecyclerView

我有RecyclerView无尽的滚动。 因此,当用户到达列表中的last – 2位置时,我呼叫服务器以获取更多数据,并且在通话期间,我再添加一个项目 – 进度一个。 现在,我正在尝试用Espresso编写体面的UI测试,它将检查当前无限滚动是否正常工作: @Test public void checkIfProgressShown() { InstaFeed feed = TestDataFactory.makeInstaFeed(20); InstaFeed oldFeed = TestDataFactory.makeInstaFeed(20); when(mockDataManager.getFeedItemsFromServer()).thenReturn(Observable.just(feed.getInstaItems())); when(mockDataManager.getOldFeedItemsFromServer()).thenReturn(Observable.just(oldFeed.getInstaItems()) .delay(2, TimeUnit.SECONDS)); instaActivityActivityTestRule.launchActivity(null); int position = 0; for (InstaItem item : feed.getInstaItems()) { onView(withId(R.id.recycler_view)) .perform(RecyclerViewActions.scrollToPosition(position)); onView(withText(item.getLocation().getName())) .check(matches(isDisplayed())); // Line of crash position++; } onView(withId(R.id.progress)) .check(matches(isDisplayed())); } 所以基本上说,我试图延迟Observable的响应,显示新的一批项目,以便Espresso可以向下滚动到最后一项并使我的progress项可见。 问题在于测试只是堆栈,从UI的角度看它看起来像: 不要混淆这里没有进度条 – 在设备上禁用animation,所以没关系。 最后,小项是有进展的项目 60秒后它崩溃了 […]

RxJava:如何使用zip操作符处理错误?

我正在使用RxJava和RxAndroid与Retrofit2。 Observable responseOneObservable = getRetrofitClient().getDataOne() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); Observable responseTwoObservable = getRetrofitClient().getDataTwo() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); 在上面两个Observer上使用如下的zip操作符。 Observable<ArrayList> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList>() { @Override public ArrayList call(ResponseOne responseOne, ResponseTwo responseTwo) { ArrayList testDataList = new ArrayList(); // Add test data from response responseOne & responseTwo return testDataList; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<ArrayList>() { […]

在Android中使用RxJava排队任务

我正在开发具有后台数据同步function的Android应用程序。 我目前正在使用RxJava定期在服务器上发布一些数据。 除此之外,我想为用户提供一个“强制同步”按钮,它会立即触发同步。 我知道如何使用Observable.interval()以固定的时间间隔推送数据,我知道如何使用Observalbe.just()来推送那个被强制的数据,但我想将它们排队,如果是这样的话当前一个仍在运行时触发一个。 所以让我们举个例子,1min是自动同步的间隔,让我们说同步持续40秒(我夸张这里只是为了更容易点)。 现在,如果有任何机会,用户在自动仍然运行时按下“强制”按钮(反之亦然 – 当强制一个仍在运行时自动触发),我想将第二个同步请求排队第一个完成。 我画了这张图片,可能会有更多的视角: 如您所见,自动被触发(通过一些Observable.interval() ),并且在同步过程中,用户按下“强制”按钮。 现在我们要等待第一个请求完成,然后再次启动强制请求。 有一次,当强制请求正在运行时,再次触发了新的自动请求,只是将其添加到队列中。 从队列中完成最后一个后,一切都停止,然后稍后再次安排自动。 希望有人能指出我纠正操作员如何做到这一点。 我已经尝试使用Observable.combineLatest() ,但是队列列表在开始时被调度,当我将新的同步添加到队列时,它在前一个操作完成时没有继续。 Darko,非常感谢任何帮助

Rx Java mergeDelayError无法按预期工作

我在RxAndroid中使用RxJava和Android应用程序。 我正在使用mergeDelayError将两个逆向拟合网络调用组合成一个observable,如果发出一个,它将处理发出的项目,如果有的话,则处理错误。 这不起作用,它只会在遇到错误时触发onError操作。 现在为了测试这个我转移到一个非常简单的例子,当我有一个onError调用时,仍然不会调用successAction。 见下面的例子。 Observable.mergeDelayError( Observable.error(new RuntimeException()), Observable.just(“Hello”) ) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .finallyDo(completeAction) .subscribe(successAction, errorAction); 只有在使用两个成功的可观察对象时才会调用成功操作。 我错过了mergeDelayError应该如何工作的东西? 编辑: 我发现如果我删除observeOn并且subscribeOn一切都按预期工作。 我需要指定线程和思想,这是使用Rx的重点。 知道为什么指定那些Schedulers会破坏行为吗?

使用Retrofit rxjava concatWith时堆栈溢出

我想使用rxjava Observable处理Retrofit中的分页。 我听从了另一个问题的建议。 我有超过100个需要获取的页面,但链在第20页左右失败并停止使用logcat中的以下日志进一步订阅observable 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ threadid=28: stack overflow on call to Ljava/util/concurrent/atomic/AtomicLongFieldUpdater$CASUpdater;.compareAndSet:ZLJJ 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ method requires 56+20+32=108 bytes, fp is 0x94b52350 (80 left) 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ expanding stack end (0x94b52300 to 0x94b52000) 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ Shrank stack (to 0x94b52300, curFrame is 0x94b548dc) 有人知道为什么会这样吗? 更新:我知道这是由于递归而发生的,但是有更优雅的方式来处理改装和rxjava的分页吗?

为什么在Android上使用Retrofit的RxJava doOnError()不起作用,但是Subscriber onError会起作用

谁能解释我为什么这样的代码: networApi.getList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> { throwable.getMessage(); }) .doOnNext(list -> { coursesView.populateRecyclerView(list); courseList = (List) courses; }).subscribe(); 如果没有互联网进入doOnError,但会进一步抛出,以便应用程序关闭,但代码如下: networkApi.getList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.getMessage(); } @Override public void onNext(List list) { coursesView.populateRecyclerView(list); courseList = (List) list; } }); 按照我的预期工作,这意味着当没有互联网连接时,它什么都不做 欢呼Wojtek

如何在RxJava中的Observable中处理map()中的exception

我想做这个: Observable.just(bitmap) .map(new Func1() { @Override public File call(Bitmap photoBitmap) { //File creation throws IOException, //I just want it to hit the onError() inside subscribe() File photoFile = new File(App.getAppContext().getCacheDir(), “userprofilepic_temp.jpg”); if(photoFile.isFile()) {//delete the file first if it exists otherwise the new file won’t be created photoFile.delete(); } photoFile.createNewFile(); //saves the file in the cache […]

IllegalArgumentException:找不到rx.Observable RxJava,Retrofit2的调用适配器

我在调用其余的api时遇到上述错误。 我正在使用retrofit2和RxJava。 ServiceFactory.java public class ServiceFactory { public static T createRetrofitService(final Class clazz, final String endpoint){ Retrofit retrofit = new Retrofit.Builder() .baseUrl(endpoint) //.addConverterFactory(GsonConverterFactory.create()) .build(); T service = retrofit.create(clazz); return service; } } MovieService.java public interface MovieService{ //public final String API_KEY = “”; public final String SERVICE_END = “https://api.mymovies.org/3/”; @GET(“movie/{movieId}??api_key=xyz”) Observable<Response> getMovies(@Field(“movieId”) int movieId); } 内部MainActivity […]