Articles of rx android

RxJava:如何使用zip操作符处理错误?

我正在使用RxJava和RxAndroid与Retrofit2。 Observable responseOneObservable = getRetrofitClient().getDataOne() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); Observable responseTwoObservable = getRetrofitClient().getDataTwo() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); 在上面两个Observer上使用如下的zip操作符。 Observable<ArrayList> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList>() { @Override public ArrayList call(ResponseOne responseOne, ResponseTwo responseTwo) { ArrayList testDataList = new ArrayList(); // Add test data from response responseOne & responseTwo return testDataList; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<ArrayList>() { […]

在Android中使用RxJava排队任务

我正在开发具有后台数据同步function的Android应用程序。 我目前正在使用RxJava定期在服务器上发布一些数据。 除此之外,我想为用户提供一个“强制同步”按钮,它会立即触发同步。 我知道如何使用Observable.interval()以固定的时间间隔推送数据,我知道如何使用Observalbe.just()来推送那个被强制的数据,但我想将它们排队,如果是这样的话当前一个仍在运行时触发一个。 所以让我们举个例子,1min是自动同步的间隔,让我们说同步持续40秒(我夸张这里只是为了更容易点)。 现在,如果有任何机会,用户在自动仍然运行时按下“强制”按钮(反之亦然 – 当强制一个仍在运行时自动触发),我想将第二个同步请求排队第一个完成。 我画了这张图片,可能会有更多的视角: 如您所见,自动被触发(通过一些Observable.interval() ),并且在同步过程中,用户按下“强制”按钮。 现在我们要等待第一个请求完成,然后再次启动强制请求。 有一次,当强制请求正在运行时,再次触发了新的自动请求,只是将其添加到队列中。 从队列中完成最后一个后,一切都停止,然后稍后再次安排自动。 希望有人能指出我纠正操作员如何做到这一点。 我已经尝试使用Observable.combineLatest() ,但是队列列表在开始时被调度,当我将新的同步添加到队列时,它在前一个操作完成时没有继续。 Darko,非常感谢任何帮助

Rx Java mergeDelayError无法按预期工作

我在RxAndroid中使用RxJava和Android应用程序。 我正在使用mergeDelayError将两个逆向拟合网络调用组合成一个observable,如果发出一个,它将处理发出的项目,如果有的话,则处理错误。 这不起作用,它只会在遇到错误时触发onError操作。 现在为了测试这个我转移到一个非常简单的例子,当我有一个onError调用时,仍然不会调用successAction。 见下面的例子。 Observable.mergeDelayError( Observable.error(new RuntimeException()), Observable.just(“Hello”) ) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .finallyDo(completeAction) .subscribe(successAction, errorAction); 只有在使用两个成功的可观察对象时才会调用成功操作。 我错过了mergeDelayError应该如何工作的东西? 编辑: 我发现如果我删除observeOn并且subscribeOn一切都按预期工作。 我需要指定线程和思想,这是使用Rx的重点。 知道为什么指定那些Schedulers会破坏行为吗?

使用Retrofit rxjava concatWith时堆栈溢出

我想使用rxjava Observable处理Retrofit中的分页。 我听从了另一个问题的建议。 我有超过100个需要获取的页面,但链在第20页左右失败并停止使用logcat中的以下日志进一步订阅observable 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ threadid=28: stack overflow on call to Ljava/util/concurrent/atomic/AtomicLongFieldUpdater$CASUpdater;.compareAndSet:ZLJJ 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ method requires 56+20+32=108 bytes, fp is 0x94b52350 (80 left) 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ expanding stack end (0x94b52300 to 0x94b52000) 04-04 04:12:11.766 2951-3012/com.example.app I/dalvikvm﹕ Shrank stack (to 0x94b52300, curFrame is 0x94b548dc) 有人知道为什么会这样吗? 更新:我知道这是由于递归而发生的,但是有更优雅的方式来处理改装和rxjava的分页吗?

为什么在Android上使用Retrofit的RxJava doOnError()不起作用,但是Subscriber onError会起作用

谁能解释我为什么这样的代码: networApi.getList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> { throwable.getMessage(); }) .doOnNext(list -> { coursesView.populateRecyclerView(list); courseList = (List) courses; }).subscribe(); 如果没有互联网进入doOnError,但会进一步抛出,以便应用程序关闭,但代码如下: networkApi.getList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.getMessage(); } @Override public void onNext(List list) { coursesView.populateRecyclerView(list); courseList = (List) list; } }); 按照我的预期工作,这意味着当没有互联网连接时,它什么都不做 欢呼Wojtek

如何在RxJava中的Observable中处理map()中的exception

我想做这个: Observable.just(bitmap) .map(new Func1() { @Override public File call(Bitmap photoBitmap) { //File creation throws IOException, //I just want it to hit the onError() inside subscribe() File photoFile = new File(App.getAppContext().getCacheDir(), “userprofilepic_temp.jpg”); if(photoFile.isFile()) {//delete the file first if it exists otherwise the new file won’t be created photoFile.delete(); } photoFile.createNewFile(); //saves the file in the cache […]

无法删除旧的javaCompile操作,可能是类名已更改

我正在学习RxJava。 为此我在RxJava上关注了droidcon谈话video 。 教练为他正在使用的项目提供了回购链接 。 当我尝试在我的机器上构建项目时,我克隆了回购。 我收到这个错误 错误:无法删除旧的javaCompile操作,可能是类名已更改? 请提交错误报告,告知您正在使用的gradle版本。 这是gradle.build文件 buildscript { repositories { mavenCentral() } dependencies { classpath ‘me.tatarka:gradle-retrolambda:2.5.0’ } } repositories { mavenCentral() maven { url “https://github.com/alter-ego/advanced-android-logger/raw/develop/releases/” } } apply plugin: ‘retrolambda’ apply plugin: ‘com.android.application’ android { compileSdkVersion 23 buildToolsVersion “25.0.0” defaultConfig { applicationId “com.packtpub.apps.rxjava_essentials” minSdkVersion 16 targetSdkVersion 22 versionCode 1 versionName “1.0” […]

使用Rxjava Android从Activity更新片段

我有一个简单的用例,其中: Activity1创建一个fragment1 创建后的fragment1通知活动它是否已创建并更新其activity1视图。 获取通知更新fragment1视图后的activity1。 我正在使用rxandroid,sublibrary rxlifecycle组件和android,但我还在学习阶段,stackoverflow上甚rx-lifecycle没有rx-lifecycle标签,所以我仍然在努力理解这个库的流程.. 编辑 我不喜欢使用EventBus,就像每个人大喊大叫做某事一样,所以Rxjava Observable方法会很有用

测试RxBinding RxSearchView

背景 public Observable<List> search(SearchView searchView) { return RxSearchView.queryTextChanges(searchView) .filter(charSequence -> !TextUtils.isEmpty(charSequence)) .throttleLast(100, TimeUnit.MILLISECONDS) .debounce(200, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(AndroidSchedulers.mainThread()) .flatMap(this::performSearch) //Search the DB .onErrorResumeNext(this::doSomething); } 我试图使用AndroidJUnit4 runner和Mocktio测试上面的方法。 @Test public void testSearchCallsDataManager_WhenCalled() { String input = “abc”; when(mockSearchView.getQuery()).thenReturn(input); searchRequestManager.search(mockSearchView).subscribe(testSubscriber); //Using standard TestSubscriber testSubscriber.assertNoErrors(); testSubscriber.assertNotCompleted(); verify(mockDataManager).getFoos(input); } 问题 我尝试过使用mockSearchView和真正的SearchView 。 mockSearchView = mock(SearchView.class); searchView = new SearchView(InstrumentationRegistry.getContext(), null); searchView […]

RxJava + Retrofit – >用于API调用的BaseObservable,用于集中响应处理

我是RxJava的新手所以请原谅我,如果这听起来太新手了:-)。 截至目前,我有一个抽象的CallbackClass,它实现了Retofit Callback。 在那里我捕获了Callback的“onResponse”和“onError”方法,并在最终转发到自定义实现的方法之前处理各种错误types。 我还使用此集中式类来进行请求/响应应用程序日志记录和其他内容。 例如:对于来自我的服务器的特定错误代码,我在响应正文中收到一个新的Auth令牌,刷新令牌,然后clone.enqueue调用。 当然,我的服务器的响应还有其他一些全局行为。 目前的解决方案(没有Rx): public abstract void onResponse(Call call, Response response, boolean isSuccess); public abstract void onFailure(Call call, Response response, Throwable t, boolean isTimeout); @Override public void onResponse(Call call, Response response) { if (_isCanceled) return; if (response != null && !response.isSuccessful()) { if (response.code() == “SomeCode” && retryCount < RETRY_LIMIT) { […]