Наблюдаемая поддерживающая реактивная тяга

Некоторое время я боролся с тем, что я считаю довольно простым вопросом.

У меня есть Flowable, который извлекает набор элементов из сети и отправляет их.

Flowable
    .create(new FlowableOnSubscribe<Item>() {
        @Override
        public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
            Item item = retrieveNext();

            while (item != null) {
                emitter.onNext(item);

                if (item.isLast) {
                    break;
                }

                item = retrieveNext();
            }

            emitter.onComplete();
        }
    }, BackpressureStrategy.BUFFER)
    .doOnRequest(new LongConsumer() {
        @Override
        public void accept(long t) throws Exception {
            // ...
        }
    });

Мне нужно, чтобы он работал на основе каждого запроса. То есть я сохраняю Subscription и вызываю subscription.request(n) при необходимости.

Статья о Backpressure (раздел "Реактивное притяжение") описывает точку зрения Observer и лишь кратко упоминает тот

Однако для того, чтобы [реактивное извлечение] работало, Observables A и B должны правильно реагировать на request() ‹...> такая поддержка не является требованием для Observables

Он не вдается в подробности относительно того, как такая поддержка в принципе может быть осуществлена.

Есть ли общий шаблон для реализации таких Observable/Flowable? Я не нашел хороший пример или ссылку. Один плохой вариант, который я могу придумать, — это иметь монитор, который я бы заблокировал внутри моего цикла while, когда emitter.requested() == 0, и разблокировал бы в doOnRequest() из другого потока. Но такой подход кажется громоздким.

Так как же вообще достигается это "замедление"?

Спасибо.


person SqueezyMo    schedule 28.02.2017    source источник
comment
Чтобы уточнить, написание операторов для 2.x описано в вики.   -  person akarnokd    schedule 01.03.2017


Ответы (1)


Используйте 1_:

Flowable.generate(
  () -> generateItemRetriever(),
  (Retriever<Item> retriever, Emitter<Item> emitter) -> {
    Item item = retriever.retrieveNext();
    if(item!=null && !item.isLast()) {
      emitter.onNext(item);
    } else {
      emitter.onComplete();
    }
  }
)

Thgenerated Flowable будет знать об обратном давлении и отказе от подписки. Если у вашего Retriever есть состояние, которое необходимо удалить, используйте перегрузку generate с тремя аргументами.

Однако, если вы выполняете HTTP-вызовы, я бы посоветовал изучить что-то вроде Retrofit, которое очень хорошо сочетается с RxJava.

person Tassos Bassoukos    schedule 28.02.2017