Некоторое время я боролся с тем, что я считаю довольно простым вопросом.
У меня есть Flowable
, который извлекает набор элементов из сети и отправляет их.
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retrieveNext();
while (item != null) {
emitter.onNext(item);
if (item.isLast) {
break;
}
item = retrieveNext();
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
// ...
}
});
Мне нужно, чтобы он работал на основе каждого запроса. То есть я сохраняю Subscription
и вызываю subscription.request(n)
при необходимости.
Статья о Backpressure (раздел "Реактивное притяжение") описывает точку зрения Observer
и лишь кратко упоминает тот
Однако для того, чтобы [реактивное извлечение] работало, Observables A и B должны правильно реагировать на request() ‹...> такая поддержка не является требованием для Observables
Он не вдается в подробности относительно того, как такая поддержка в принципе может быть осуществлена.
Есть ли общий шаблон для реализации таких Observable
/Flowable
? Я не нашел хороший пример или ссылку. Один плохой вариант, который я могу придумать, — это иметь монитор, который я бы заблокировал внутри моего цикла while
, когда emitter.requested() == 0
, и разблокировал бы в doOnRequest()
из другого потока. Но такой подход кажется громоздким.
Так как же вообще достигается это "замедление"?
Спасибо.