在Android中使用RxJava排队任务

我正在开发具有后台数据同步function的Android应用程序。 我目前正在使用RxJava定期在服务器上发布一些数据。 除此之外,我想为用户提供一个“强制同步”按钮,它会立即触发同步。 我知道如何使用Observable.interval()以固定的时间间隔推送数据,我知道如何使用Observalbe.just()来推送那个被强制的数据,但我想将它们排队,如果是这样的话当前一个仍在运行时触发一个。

所以让我们举个例子,1min是自动同步的间隔,让我们说同步持续40秒(我夸张这里只是为了更容易点)。 现在,如果有任何机会,用户在自动仍然运行时按下“强制”按钮(反之亦然 – 当强制一个仍在运行时自动触发),我想将第二个同步请求排队第一个完成。

我画了这张图片,可能会有更多的视角:

在此处输入图像描述

如您所见,自动被触发(通过一些Observable.interval() ),并且在同步过程中,用户按下“强制”按钮。 现在我们要等待第一个请求完成,然后再次启动强制请求。 有一次,当强制请求正在运行时,再次触发了新的自动请求,只是将其添加到队列中。 从队列中完成最后一个后,一切都停止,然后稍后再次安排自动。

希望有人能指出我纠正操作员如何做到这一点。 我已经尝试使用Observable.combineLatest() ,但是队列列表在开始时被调度,当我将新的同步添加到队列时,它在前一个操作完成时没有继续。

Darko,非常感谢任何帮助

  • 改造/ Rxjava和基于会话的服务
  • 使用Retrofit和RX-java处理没有networking的正确方法
  • 在项目中find缺少的onError
  • RxJava- RxAndroid在动态EditText上进行表单validation
  • rxandroid要求在ui线程上运行,即使它是在AndroidSchedulers.mainThread()上订阅的
  • RxJava:找出BehaviorSubject是否是重复的值
  • 处理networking错误与改造可观察
  • 是否有可能在rxjava android订户的onNext()中获得2个值?
  • 您可以通过将计时器与按钮单击Observable / Subject合并,使用onBackpressureBuffer的排队效果并将处理concatMap放入其中来确保一次运行一个。

     PublishSubject subject = PublishSubject.create(); Observable periodic = Observable.interval(1, 1, TimeUnit.SECONDS); periodic.mergeWith(subject) .onBackpressureBuffer() .concatMap(new Func1>() { @Override public Observable call(Long v) { // simulates the task to run return Observable.just(1) .delay(300, TimeUnit.MILLISECONDS); } } ).subscribe(System.out::println, Throwable::printStackTrace); Thread.sleep(1100); // user clicks a button subject.onNext(-1L); Thread.sleep(800); 

    虽然有一个好的解决方案可以接受的答案,但我想分享使用Scheduler和SingleThreadExecutor执行此操作的另一个选项

     public static void main(String[] args) throws Exception { System.out.println(" init "); Observable numberObservable = Observable.interval(700, TimeUnit.MILLISECONDS).take(10); final Subject subject = PublishSubject.create(); Executor executor = Executors.newSingleThreadExecutor(); Scheduler scheduler = Schedulers.from(executor); numberObservable.observeOn(scheduler).subscribe(subject); subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"), onCompleteFunc("subscriber 1")); Thread.sleep(800); //simulate action executor.execute(new Runnable() { @Override public void run() { subject.onNext(333l); } }); Thread.sleep(5000); } static Action1 onNextFunc(final String who) { return new Action1() { public void call(Long x) { System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName() + " -- " + System.currentTimeMillis()); try { //simulate some work Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }; } static Action1 onErrorFunc(final String who) { return new Action1() { public void call(Throwable t) { t.printStackTrace(); } }; } static Action0 onCompleteFunc(final String who) { return new Action0() { public void call() { System.out.println(who + " complete"); } }; }