Как я могу заставить какое-то обратное давление избежать многократного выполнения в rxjava?

У меня есть фрагмент кода, работа которого заключается в обновлении локального кеша. Есть два триггера для этого обновления кеша:

  1. Через фиксированный интервал
  2. По запросу

Итак, вот базовый пример того, как я это сделал.

forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
    .merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
    .flatMap(new Func1<Long, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Long t) {
            return reloadData(); // operation that may take long
        }
    })
    .publish();

dataUpdates.subscribe();
dataUpdates.connect();

Потом позже у меня

public void forceReload() {
    final CountDownLatch cdl = new CountDownLatch(1);

    dataUpdates
        .take(1)
        .subscribe(
            new Action1<Boolean>() {
                @Override
                public void call(Boolean b) {
                    cdl.countDown();
                }
            }
        );

    forceReloadEvents.onNext(-1L);

    try {
        cdl.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Это работает, но проблема заключается в том, что когда я начинаю иметь несколько одновременных вызовов forceReload(): одновременного выполнения reloadData() не будет, но элементы будут стоять в очереди, и процесс будет зацикливаться на перезагрузке данных до тех пор, пока все события, отправленные на forceReloadEvents, не будут использованы даже хотя forceReload() уже завершено из-за предыдущих событий, выпустивших CountDownLatch.

Я хотел использовать onBackPressureDrop, но, похоже, нет индуцированного обратного давления и ничего не сбрасывается. Я бы хотел, чтобы какой-то способ принудительно противодавить, чтобы слияние понимало, что только один элемент может быть обработан за раз и что любое последующее событие должно быть отброшено до тех пор, пока не будет выполнено текущее выполнение.

Я тоже думал об использовании buffer или throttleFirst, но я не хочу устанавливать определенное время между каждым событием, и я бы предпочел, чтобы это автоматическое масштабирование зависело от времени, необходимого для перезагрузки кеша. Вы можете думать об этом как о throttleFirst, пока не завершится reloadData.


person Crystark    schedule 15.06.2015    source источник


Ответы (2)


Изменить: судя по комментариям, вы можете использовать AtomicBoolean в качестве ворот в flatMap, чтобы не запускать перезагрузку, пока врата снова не откроются:

public class AvoidReloadStorm {
    static Observable<Boolean> reload() {
        return Observable.just(true)
        .doOnNext(v -> System.out.println("Reload started..."))
        .delay(10, TimeUnit.SECONDS)
        .doOnNext(v -> System.out.println("Reloaded"));
    }
    public static void main(String[] args) throws Exception {
        Subject<Long, Long> manual = PublishSubject.<Long>create().toSerialized();
        Observable<Long> timer = Observable.timer(0, 5, TimeUnit.SECONDS)
                .doOnNext(v -> System.out.println("Timer reload"));

        AtomicBoolean running = new AtomicBoolean();

        ConnectableObservable<Boolean> src = Observable
        .merge(manual.onBackpressureDrop(), timer.onBackpressureDrop())
        .observeOn(Schedulers.io())
        .flatMap(v -> {
            if (running.compareAndSet(false, true)) {
                return reload().doOnCompleted(() -> {
                    running.set(false);
                });
            }
            System.out.println("Reload rejected");
            return Observable.empty();
        }).publish();

        src.subscribe(System.out::println);

        src.connect();

        Thread.sleep(100000);
    }
}
person akarnokd    schedule 15.06.2015
comment
Я не обязательно хочу сопоставлять onNext с сообщением об успешной перезагрузке. Я просто хочу убедиться, что я получил успешную перезагрузку после, когда я вызвал onNext, даже если он был запущен в данный момент, когда я сделал вызов onNext. throttleFirst беспокоит меня, так как если перезагрузка занимает, например, всего 500 мс, тогда все вызовы onNext между 500-м и 1000-м миллисекундами не будут иметь никакого эффекта, и forceReload будет ждать следующего события до того, как CDL будет выпущен. - person Crystark; 15.06.2015
comment
Я вставил дроссель в первую очередь, чтобы избежать бури перезагрузки, но вам не обязательно его использовать. - person akarnokd; 15.06.2015
comment
Ну, это именно то, чего я хочу избежать: перезагрузить штормы. Но без определения конкретного таймера, в течение которого следует избегать перезагрузки, как это делает throttleFirst. Я хотел бы, чтобы flatMap генерировал противодавление всякий раз, когда он получает событие во время выполнения reloadData. Это также можно интерпретировать как дросселирование, пока не закончится текущий reloadData. - person Crystark; 15.06.2015
comment
Добавления AtomicBoolean недостаточно. Когда я делаю observeOn, кажется, что все ставится в очередь, поэтому ничего не отклоняется. Вы показали мне правильный путь, и я получил решение, работающее с использованием .filter() до observeOn. Смотрите мой ответ, который я опубликую через секунду. - person Crystark; 16.06.2015
comment
Если я запускаю свой пример, он печатает отклонено, как и ожидалось. - person akarnokd; 16.06.2015
comment
Странно, может быть, это потому, что вы не используете ручные события, а только таймер? Я не получал отказов в отправке 30 одновременных ручных событий. - person Crystark; 16.06.2015

Я сделал эту работу благодаря akarnokd!

Вот решение, которое я создал на основе его ответа:

Observable<Long> forceReloadEvents = this.forceReloadEvents
    .asObservable()
    .onBackpressureDrop();

Observable<Long> periodicReload = Observable
    .timer(0, pullInterval, TimeUnit.SECONDS)
    .onBackpressureDrop();

final AtomicBoolean running = new AtomicBoolean();

dataUpdates = Observable
    .merge(forceReloadEvents, periodicReload)
    .filter(new Func1<Long, Boolean>() {
        @Override
        public Boolean call(Long t) {
            return running.compareAndSet(false, true);
        }
    })
    .observeOn(Schedulers.io())
    .flatMap(new Func1<Long, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Long t) {
            return reloadData();
        }
    })
    .doOnNext(new Action1<Boolean>() {
        @Override
        public void call(Boolean t) {
            running.set(false);
        }
    })
    .publish();

dataUpdates.subscribe();
dataUpdates.connect();

Я не уверен, что onBackpressureDrop полезен здесь, но я установил его в качестве меры предосторожности.

Код forceReload не меняется.

person Crystark    schedule 16.06.2015