Articles of rx java

Android Rxjava订阅了variables

我正在学习观察者模式,我希望我的观察者在改变它的值并做一些操作时跟踪某个variables,我做了类似的事情: public class Test extends MyChildActivity { private int VARIABLE_TO_OBSERVE = 0; Observable mObservable = Observable.just(VARIABLE_TO_OBSERVE); protected void onCreate() {/*onCreate method*/ super(); setContentView(); method(); changeVariable(); } public void changeVariable() { VARIABLE_TO_OBSERVE = 1; } public void method() { mObservable.map(value -> { if (value == 1) doMethod2(); return String.valueOf(value); }).subScribe(string -> System.out.println(string)); } public void doMethod2() […]

使用RxJava和Okhttp

我想在另一个线程(如IO线程)中使用okhttp请求一个url并在Android主线程中获取Response ,但我不知道如何创建一个Observable 。

将AsyncTask转换为RxAndroid

我有以下方法使用otto和AsyncTask发布对UI的响应。 private static void onGetLatestStoryCollectionSuccess(final StoryCollection storyCollection, final Bus bus) { new AsyncTask() { @Override protected Void doInBackground(Void… params) { bus.post(new LatestStoryCollectionResponse(storyCollection)); return null; } }.execute(); } 我需要帮助使用RxAndroid库将此AsyncTask转换为RxJava 。

如何在使用Retrofit 2的unit testing期间创建retrofit.Response对象

在使用RxJava和Retrofit 2时,我正在尝试创建unit testing以覆盖我的应用程序何时收到特定响应。 我的问题是,使用Retrofit 2,我看不到一个很好的方法来创建一个不使用reflection的retrofit.Response对象。 @Test public void testLogin_throwsLoginBadRequestExceptionWhen403Error() { Request.Builder requestBuilder = new Request.Builder(); requestBuilder.get(); requestBuilder.url(“http://localhost”); Response.Builder responseBuilder = new Response.Builder(); responseBuilder.code(403); responseBuilder.protocol(Protocol.HTTP_1_1); responseBuilder.body(ResponseBody.create(MediaType.parse(“application/json”), “{\”key\”:[\”somestuff\”]}”)); responseBuilder.request(requestBuilder.build()); retrofit.Response aResponse = null; try { Constructor constructor= (Constructor) retrofit.Response.class.getDeclaredConstructors()[0]; constructor.setAccessible(true); aResponse = constructor.newInstance(responseBuilder.build(), null, null); } catch (Exception ex) { //reflection error } doReturn(Observable.just(aResponse)).when(mockLoginAPI).login(anyObject()); TestSubscriber testSubscriber = […]

如何正确处理RxJava(Android)中的onError?

我正在获取设备上已安装应用的列表。 这是一个代价高昂的操作,所以我正在使用Rx: Observable observable = Observable.create(subscriber -> { List result = getUserApps(); subscriber.onNext(result); subscriber.onError(new Throwable()); subscriber.onCompleted(); }); observable .map(s -> { ArrayList list = new ArrayList(); ArrayList applist = new ArrayList(); for (Application p : (ArrayList) s) { list.add(p.getAppName()); applist.add(p); } return applist; }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> Le(TAG, “Throwable ” + throwable.getMessage())) .subscribe(s -> […]

使用rxjava全局处理网络exception进行改造

我试图在全局级别处理应用程序中的exception,因此改造会抛出错误,我会在某个特定的类中捕获它,并使用逻辑来处理这些错误。 我有一个界面 @POST(“/token”) AuthToken refreshToken(@Field(“grant_type”) String grantType, @Field(“refresh_token”) String refreshToken); 和可观察的 /** * Refreshes auth token * * @param refreshToken * @return */ public Observable refreshToken(String refreshToken) { return Observable.create((Subscriber subscriber) -> { try { subscriber.onNext(apiManager.refreshToken(REFRESH_TOKEN, refreshToken)); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } }).subscribeOn(Schedulers.io()); } 当我从服务器获得401(无效令牌或其他一些网络相关的错误)时,我想刷新令牌并重复其余的呼叫。 是否有一种方法可以使用rxjava对所有其余的调用使用某种可观察的全局捕获此错误,处理它并重复抛出它的调用? 现在我使用subject来捕获.subscribe()这样的错误 private static BehaviorSubject errorEvent = […]

Rxjava和Volley请求

我的问题应该听起来像傻瓜,但我只是从Asynktask跳到RxJava 。 所以: 可以使用RxJava Observable和Volley Requests吗?这意味着,使用未来的请求。 我问这个问题,因为像改造这样的另一个httpClient使用RxJava非常好,但是个人喜欢Volley,所以它可能吗? 编辑 基于第一个答案,我知道它是可能的。 你能分享一些展示如何做到这一点的样本吗?

RxJava + Retrofit长轮询

我的问题是我无法通过Retrofit得到无限的流。 在我获得初始poll()请求的凭据后 – 我执行初始poll()请求。 如果没有变化,则每个poll()请求在25秒内响应,如果有任何变化,则每个poll()请求更早 – 返回changed_data []。 每个响应都包含下一个轮询请求所需的timestamp数据 – 我应该在每个poll()响应之后执行新的poll()请求。 这是我的代码: getServerApi().getLongPollServer() .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll(“a_check”, Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), “”) .take(1) .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll(“a_check”, Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), “”))) .retry() .subscribe(longPollEnvelope1 -> { processUpdates(longPollEnvelope1.getUpdates()); }); 我是RxJava的新手,也许我不懂东西,但我无法获得无限的流。 我接到3个电话,然后是onNext和onComplete。 PS也许有更好的解决方案在Android上实现长轮询?

使用Retrofit observable处理网络错误

使用Obtrables with Retrofit时如何处理网络故障? 鉴于此代码: Observable observable = api.getApiService().getMyData(); observable .doOnNext(new Action1() { @Override public void call(GetJobResponse getJobResponse) { //do stuff with my data } }) .doOnError(new Action1() { @Override public void call(Throwable throwable) { //do stuff with error message } }); 如果没有网络,请求就会失败,并且不会调用onError。 它不会崩溃,但会无声地失败。 日志显示Retrofit获取错误: java.net.UnknownHostException: Unable to resolve host “api-staging.sittercity.com”: No address associated with hostname […]

使用RXJava和Retrofit获取标题信息

我正在尝试转换我目前使用Retrofit的应用程序,以使用RX Java。 为了处理分页,我传统上是从响应头中获取nextPage URL。 @Override public void success(Assignment assignment, Response response) { response.getHeaders(); // Do stuff with header info } 但是,由于切换到RX Java,我不知道如何从我的改装调用中获取响应信息。 @GET(“/{item_id}/users”) Observable<List> getObjects(@Path(“object_id”) long object_id); @GET(“/{next}”) Observable<List> getNextPageObjects(@Path(“next”) String nextURL); 有没有办法让我的改装调用返回我的标题信息和我的types对象?