Rxjava Android如何使用Zip运算符

我有很多理解RxJava中的zip运算符为我的android项目。 问题我需要能够发送一个networking请求上传一个video然后我需要发送一个networking请求上传一张图片去与它最后我需要添加一个描述,并使用前两个请求的响应上传video和图片的位置url以及描述到我的服务器。

我认为,zip操作员对于这个任务是完美的,因为我知道我们可以采取两个可观察到的事件(video和图片请求)的回应,并将它们用于我的最终任务。 但我似乎不能得到这个发生如何我想象。

我正在寻找一个人来回答一下伪代码的概念。 谢谢

Solutions Collecting From Web of "Rxjava Android如何使用Zip运算符"

Zip运算符严格将可观察对象的发射项目对。 它等待两个(或更多)项目到达然后合并它们。 所以,这是适合您的需求。

我将使用Func2来链接前两个观测值的结果。 注意,如果使用Retrofit,这个方法会更简单,因为它的api接口可能会返回一个observable。 否则,你将需要创build自己的observable。

 // assuming each observable returns response in the form of String Observable<String> movOb = Observable.create(...); // if you use Retrofit Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...), Observable.zip(movOb, picOb, new Func2<String, String, MyResult>() { @Override public MyResult call(String movieUploadResponse, String picUploadResponse) { // analyze both responses, upload them to another server // and return this method with a MyResult type return myResult; } } ) // continue chaining this observable with subscriber // or use it for something else 

一步一步: – >首先定义我们的Retrofit对象来访问Github的API,然后为上面的两个networking请求设置两个observable:

 Retrofit repo = new Retrofit.Builder() .baseUrl("https://api.github.com") .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); Observable<JsonObject> userObservable = repo .create(GitHubUser.class) .getUser(loginName) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); Observable<JsonArray> eventsObservable = repo .create(GitHubEvents.class) .listEvents(loginName) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); 

改造界面非常简单:

 public interface GitHubUser { @GET("users/{user}") Observable<JsonObject> getUser(@Path("user") String user); } public interface GitHubEvents { @GET("users/{user}/events") Observable<JsonArray> listEvents(@Path("user") String user); } 

最近,我们使用RxJava的zip方法来结合我们的两个Observable并等待它们完成,然后创build一个新的Observable。

 Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() { @Override public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) { return new UserAndEvents(jsonObject, jsonElements); } }); 

什么是UserAndEvents? 这只是一个简单的POJO来结合这两个对象:

 public class UserAndEvents { public UserAndEvents(JsonObject user, JsonArray events) { this.events = events; this.user = user; } public JsonArray events; public JsonObject user; } 

最后让我们在我们新的组合的Observable上调用订阅方法:

 combined.subscribe(new Subscriber<UserAndEvents>() { ... @Override public void onNext(UserAndEvents o) { // You can access the results of the // two observabes via the POJO now } }); 

一个小例子 :

 Observable<String> stringObservable1 = Observable.just("Hello", "World"); Observable<String> stringObservable2 = Observable.just("Bye", "Friends"); Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() { @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { return s + " - " + s2; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); 

这将打印:

 Hello - Bye World - Friends 

在这里,我有一个例子,我使用Zip以asynchronous的方式,以防万一,你好奇

  /** * Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. * By default Rx is not async, only if you explicitly use subscribeOn. */ @Test public void testAsyncZip() { scheduler = Schedulers.newThread(); scheduler1 = Schedulers.newThread(); scheduler2 = Schedulers.newThread(); long start = System.currentTimeMillis(); Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) .concat(s3)) .subscribe(result -> showResult("Async in:", start, result)); } /** * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline */ @Test public void testZip() { long start = System.currentTimeMillis(); Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) .concat(s3)) .subscribe(result -> showResult("Sync in:", start, result)); } public void showResult(String transactionType, long start, String result) { System.out.println(result + " " + transactionType + String.valueOf(System.currentTimeMillis() - start)); } public Observable<String> obString() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "Hello"); } public Observable<String> obString1() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> " World"); } public Observable<String> obString2() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "!"); } public Observable<String> obAsyncString() { return Observable.just("") .observeOn(scheduler) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "Hello"); } public Observable<String> obAsyncString1() { return Observable.just("") .observeOn(scheduler1) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> " World"); } public Observable<String> obAsyncString2() { return Observable.just("") .observeOn(scheduler2) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "!"); } 

你可以在这里看到更多的例子https://github.com/politrons/reactive

我一直在寻找一个关于如何使用Zip操作符的简单答案,以及如何处理我创build的Observable以将它们传递给它,我想知道是否应该为每个observable调用subscribe()或不是这些答案很容易find,我必须找出我自己,所以这里是一个简单的例子,在2个Observable上使用Zip操作符:

 @Test public void zipOperator() throws Exception { List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4); List<String> letters = Arrays.asList("a", "b", "c", "d", "e"); Observable<Integer> indexesObservable = Observable.fromIterable(indexes); Observable<String> lettersObservable = Observable.fromIterable(letters); Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems()) .subscribe(printMergedItems()); } @NonNull private BiFunction<Integer, String, String> mergeEmittedItems() { return new BiFunction<Integer, String, String>() { @Override public String apply(Integer index, String letter) throws Exception { return "[" + index + "] " + letter; } }; } @NonNull private Consumer<String> printMergedItems() { return new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }; } 

打印结果是:

 [0] a [1] b [2] c [3] d [4] e 

在我脑海中的问题的最终答案如下

传递给zip()方法的Observable只需要创build,他们不需要有任何订阅者,只有创build它们就足够了…如果你想让任何observable在一个调度器上运行,你可以指定这个对于那个Observable …我也试过了Observable的zip()运算符,他们应该等待那里的结果,而zip()的消耗品只有在两个结果准备好的时候才触发(这是预期的行为)

zip运算符允许您从两个不同的可观察结果中编写结果。

你将不得不给予lambda,这将创build从每个observable发出的数据结果。

 Observable<MovieResponse> movies = ... Observable<PictureResponse> picture = ... Observable<Response> response = movies.zipWith(picture, (movie, pic) -> { return new Response("description", movie.getName(), pic.getUrl()); });