使用RxJava惰性读取分页对象

我几乎卖给了RxJava,这是一个改进的完美伴侣,但我正在努力进入一个常见的模式,同时迁移我的代码:为了节省带宽,我想懒惰地从我的web服务(分页)对象根据需要,而我的listview(或recyclerview)滚动使用反应式编程。

我以前的代码完美地完成了这项工作,但是反应式编程似乎值得尝试。

听ListView / recyclerview滚动(和其他无聊的东西) 是不是关心和获取一个Observable很容易使用Retrofit:

@GET("/api/messages") Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit); 

我只是无法弄清楚在反应式编程中使用的模式。

Concat算子似乎是一个很好的起点, ConnectableObservable在某些时候推迟了排放,也许是flatMap ,但是怎么样呢?

编辑:

这是我目前(天真)的解决scheme:

 public interface Paged<T> { boolean isLoading(); void cancel(); void next(int count); void next(int count, Scheduler scheduler); Observable<List<T>> asObservable(); boolean hasCompleted(); int position(); } 

而我的实现使用一个主题:

 public abstract class SimplePaged<T> implements Paged<T> { final PublishSubject<List<T>> subject = PublishSubject.create(); private volatile boolean loading; private volatile int offset; private Subscription subscription; @Override public boolean isLoading() { return loading; } @Override public synchronized void cancel() { if(subscription != null && !subscription.isUnsubscribed()) subscription.unsubscribe(); if(!hasCompleted()) subject.onCompleted(); subscription = null; loading = false; } @Override public void next(int count) { next(count, null); } @Override public synchronized void next(int count, Scheduler scheduler) { if (isLoading()) throw new IllegalStateException("you can't call next() before onNext()"); if(hasCompleted()) throw new IllegalStateException("you can't call next() after onCompleted()"); loading = true; Observable<List<T>> obs = onNextPage(offset, count).single(); if(scheduler != null) obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler! subscription = obs.subscribe(this::onNext, this::onError, this::onComplete); } @Override public Observable<List<T>> asObservable() { return subject.asObservable(); } @Override public boolean hasCompleted() { return subject.hasCompleted(); } @Override public int position() { return offset; } /* Warning: functions below may be called from another thread */ protected synchronized void onNext(List<T> items) { if (items != null) offset += items.size(); loading = false; if (items == null || items.size() == 0) subject.onCompleted(); else subject.onNext(items); } protected synchronized void onError(Throwable t) { loading = false; subject.onError(t); } protected synchronized void onComplete() { loading = false; } abstract protected Observable<List<T>> onNextPage(int offset, int count); } 

Solutions Collecting From Web of "使用RxJava惰性读取分页对象"

以下是处理反应式分页的几种潜在方法之一。 假设我们有一个方法getNextPageTrigger ,它返回一个Observable当滚动监听器(或任何input)想要加载一个新页面时,它会发出一些事件对象。 在现实生活中,它可能会有debounce操作符,但除此之外,我们将确保我们只在最新的页面加载后触发它。

我们还定义了一个从列表中解开消息的方法:

 Observable<Message> getPage(final int page) { return service.getMessages(page * PAGE_SIZE, PAGE_SIZE) .flatMap(messageList -> Observable.from(messageList)); } 

然后,我们可以制定实际的获取逻辑:

 // Start with the first page. getPage(0) // Add on each incremental future page. .concatWith(Observable.range(1, Integer.MAX_VALUE) // Uses a little trick to get the next page to wait for a signal to load. // By ignoring all actual elements emitted and casting, the trigger must // complete before the actual page request will be made. .concatMap(page -> getNextPageTrigger().limit(1) .ignoreElements() .cast(Message.class) .concatWith(getPage(page))) // Then subscribe, etc.. 

这仍然缺less一些潜在的重要的事情:

1 – 这显然不知道何时停止获取额外的页面,这意味着一旦达到最后,取决于服务器返回什么,它可能会保持击中错误或空的结果每次滚动触发。 解决这个问题的方法取决于你如何告诉客户端没有更多的页面加载。

2 – 如果你需要错误重试,我会build议看看retryWhen操作符。 否则,常见的networking错误可能会导致页面加载错误传播。