Articles of rx java

为什么在Android上使用Retrofit的RxJava doOnError()不起作用,但是Subscriber onError会起作用

谁能解释我为什么这样的代码: networApi.getList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> { throwable.getMessage(); }) .doOnNext(list -> { coursesView.populateRecyclerView(list); courseList = (List) courses; }).subscribe(); 如果没有互联网进入doOnError,但会进一步抛出,以便应用程序关闭,但代码如下: networkApi.getList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.getMessage(); } @Override public void onNext(List list) { coursesView.populateRecyclerView(list); courseList = (List) list; } }); 按照我的预期工作,这意味着当没有互联网连接时,它什么都不做 欢呼Wojtek

如何在RxJava中的Observable中处理map()中的exception

我想做这个: Observable.just(bitmap) .map(new Func1() { @Override public File call(Bitmap photoBitmap) { //File creation throws IOException, //I just want it to hit the onError() inside subscribe() File photoFile = new File(App.getAppContext().getCacheDir(), “userprofilepic_temp.jpg”); if(photoFile.isFile()) {//delete the file first if it exists otherwise the new file won’t be created photoFile.delete(); } photoFile.createNewFile(); //saves the file in the cache […]

IllegalArgumentException:找不到rx.Observable RxJava,Retrofit2的调用适配器

我在调用其余的api时遇到上述错误。 我正在使用retrofit2和RxJava。 ServiceFactory.java public class ServiceFactory { public static T createRetrofitService(final Class clazz, final String endpoint){ Retrofit retrofit = new Retrofit.Builder() .baseUrl(endpoint) //.addConverterFactory(GsonConverterFactory.create()) .build(); T service = retrofit.create(clazz); return service; } } MovieService.java public interface MovieService{ //public final String API_KEY = “”; public final String SERVICE_END = “https://api.mymovies.org/3/”; @GET(“movie/{movieId}??api_key=xyz”) Observable<Response> getMovies(@Field(“movieId”) int movieId); } 内部MainActivity […]

在Retrofit + RxJava中链接请求

我有2个API,我想按顺序请求并将它们的数据存储在SQLite中。 首先,我想向API A发出请求并将其数据存储在SQL表a 。 然后向API B发出请求并将其数据存储在表b并将一些数据存储在表a_b 。 存储在a_b的数据仅来自请求B 我怎么能用RxJava做到这一点。 我在这里读到了关于使用flatMap的地方,就像这样 apiService.A() // store in DB here? How? maybe use map()? .flatMap(modelA -> { // or maybe store modelA in DB here? return apiService.B().map(modelB -> { storeInDB()l // store B here ? return modelB; }); }); 如果我没有使用lambda函数,这看起来就像普通的嵌套调用一样难看。 这是一个更好的方法吗?

Android中的RxJava异步任务

我试图在Android中使用RxJava实现异步任务。 我尝试了以下代码,但它没有用。 它在UI线程上执行。 我使用的是以下版本的RxAndroid 0.24.0。 try { Observable.just(someMethodWhichThrowsException()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(s -> onMergeComplete()); } catch (IOException e) { e.printStackTrace(); } 但是,以下内容对我来说是异步的。 Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { try { someMethodWhichThrowsException(); } catch (IOException e) { e.printStackTrace(); } subscriber.onCompleted(); } }); observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(); 我想了解以下内容:1。它们之间有什么区别? 2.创建异步任务时的最佳做法是什么? 提前致谢。

如何在使用rxjava的android延迟后调用一个方法?

我试图将我的Handler方法替换为Rx java。 我的要求 我想在5秒后调用方法getTransactionDetails()。 这是我使用Handler的工作代码 new Handler().postDelayed(new Runnable() { @Override public void run() { getTransactionDetails(); } }, 5000); Rx java代码 – 它不起作用 Observable.empty().delay(5000, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(o -> getTransactionDetails()) .subscribe();

无法删除旧的javaCompile操作,可能是类名已更改

我正在学习RxJava。 为此我在RxJava上关注了droidcon谈话video 。 教练为他正在使用的项目提供了回购链接 。 当我尝试在我的机器上构建项目时,我克隆了回购。 我收到这个错误 错误:无法删除旧的javaCompile操作,可能是类名已更改? 请提交错误报告,告知您正在使用的gradle版本。 这是gradle.build文件 buildscript { repositories { mavenCentral() } dependencies { classpath ‘me.tatarka:gradle-retrolambda:2.5.0’ } } repositories { mavenCentral() maven { url “https://github.com/alter-ego/advanced-android-logger/raw/develop/releases/” } } apply plugin: ‘retrolambda’ apply plugin: ‘com.android.application’ android { compileSdkVersion 23 buildToolsVersion “25.0.0” defaultConfig { applicationId “com.packtpub.apps.rxjava_essentials” minSdkVersion 16 targetSdkVersion 22 versionCode 1 versionName “1.0” […]

RxJava Observable.cache无效

我想在Android环境中学习rxjava。 假设我有一个可以发出网络调用结果的observable。 如果我理解正确,处理配置更改的一种常见方法是: 将observable存储在保留的片段/单例/应用程序对象中 将缓存运算符应用于observable 在适当的生命周期处理程序中订阅/取消订阅 这样做,我们不会松开一旦新配置发生后将重新观察的observable的结果。 现在,我的问题是: 有没有办法强制observable发出一个新值(并使缓存的值无效)? 每当我想要来自网络的新数据时,我是否需要创建一个新的observable(这听起来不像Android世界中的坏习惯,因为这会使gc做额外的工作)? 非常感谢, 费德里科

Rx Observable定期发射值

我必须定期轮询一些RESTful端点以刷新我的Android应用程序的数据。 我还必须根据连接暂停和恢复它(如果手机处于离线状态,则无需尝试)。 我目前的解决方案是有效的,但它使用标准的Java的ScheduledExecutorService来执行定期任务,但我想留在Rx范例中。 这是我当前的代码,为简洁起见,部分代码被跳过。 userProfileObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber subscriber) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); final Runnable runnable = new Runnable() { @Override public void run() { // making http request here } }; final List<ScheduledFuture> futures = new ArrayList<ScheduledFuture>(1); networkStatusObservable.subscribe(new Action1() { @Override public void call(Boolean networkAvailable) { […]

RXJava – 创建一个可以观察的可观察对象(例如缓冲区和窗口)

我想创建以下内容的observable: 缓冲所有项目,同时暂停 立即发出物品,而不是暂停 暂停/恢复触发器必须来自另一个可观察者 它必须保存以供不在主线程上运行的observable使用,并且必须保存才能从主线程更改暂停/恢复状态 我想使用BehaviorSubject作为触发器,并将此触发器绑定到活动的onResume和onPause事件。 (附加代码示例) 题 我已经设置了一些东西,但它没有按预期工作。 我用它如下: Observable o = …; // Variant 1 o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) // Variant 2 // o = o.compose(RXPauser.applyPauser(getPauser())); o .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); 目前的问题是,Variant 1应该可以正常工作,但有时,事件只是没有发出 – 阀门没有发射,直到阀门一切正常工作(可能是一个穿线问题……)! 解决方案2更简单,似乎有效,但我不确定它是否真的更好,我不这么认为。 我实际上不确定,为什么解决方案有时会失败所以我不确定解决方案2是否解决了(目前为我未知)问题… 有人可以告诉我可能是什么问题或者简单的解决方案应该可靠地工作吗? 或者给我一个可靠的解决方案? 码 RxValue https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08 RXPauserfunction public static Observable.Transformer applyPauser(Observable pauser) { return observable -> […]