У меня есть фрагмент кода, работа которого заключается в обновлении локального кеша. Есть два триггера для этого обновления кеша:
- Через фиксированный интервал
- По запросу
Итак, вот базовый пример того, как я это сделал.
forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
.merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData(); // operation that may take long
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
Потом позже у меня
public void forceReload() {
final CountDownLatch cdl = new CountDownLatch(1);
dataUpdates
.take(1)
.subscribe(
new Action1<Boolean>() {
@Override
public void call(Boolean b) {
cdl.countDown();
}
}
);
forceReloadEvents.onNext(-1L);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Это работает, но проблема заключается в том, что когда я начинаю иметь несколько одновременных вызовов forceReload()
: одновременного выполнения reloadData()
не будет, но элементы будут стоять в очереди, и процесс будет зацикливаться на перезагрузке данных до тех пор, пока все события, отправленные на forceReloadEvents
, не будут использованы даже хотя forceReload()
уже завершено из-за предыдущих событий, выпустивших CountDownLatch
.
Я хотел использовать onBackPressureDrop
, но, похоже, нет индуцированного обратного давления и ничего не сбрасывается. Я бы хотел, чтобы какой-то способ принудительно противодавить, чтобы слияние понимало, что только один элемент может быть обработан за раз и что любое последующее событие должно быть отброшено до тех пор, пока не будет выполнено текущее выполнение.
Я тоже думал об использовании buffer
или throttleFirst
, но я не хочу устанавливать определенное время между каждым событием, и я бы предпочел, чтобы это автоматическое масштабирование зависело от времени, необходимого для перезагрузки кеша. Вы можете думать об этом как о throttleFirst
, пока не завершится reloadData
.