Использование диапазона в zipWith также выдает все элементы из последовательности диапазона до применения функции застежки-молнии.

Вопрос про RxJava2.

Заметил, что при сжатии Throwable из retryWhen с помощью range выбрасываются все элементы из Observable.range до того, как была применена функция застегивания. Кроме того, range выдает последовательность, даже если zipWith не вызывалась. Например, этот исходный код

Observable.create<String> {
        println("subscribing")
        it.onError(RuntimeException("always fails"))
    }
    .retryWhen {
        it.zipWith(Observable.range(1, 3).doOnNext { println("range $it") },
                BiFunction { t: Throwable, i: Int -> i })
                .flatMap {
                    System.out.println("delay retry by $it + second(s)")
                    Observable.timer(it.toLong(), TimeUnit.SECONDS)
                }
    }./*subscribe*/

дает следующий результат

 range 1
 range 2
 range 3
 subscribing
 delay retry by 1 + second(s)
 subscribing
 delay retry by 2 + second(s)
 subscribing
 delay retry by 3 + second(s)
 subscribing
 onComplete

Замена onError в создании observable также не устраняет создание range элементов. Итак, вопрос в том, почему это происходит, когда Range холодно.


person Sunstrike    schedule 24.08.2017    source источник
comment
Поскольку flatMap выполняет предварительную выборку до 128 элементов (16 на мобильных устройствах). Используйте concatMap или flatMap(... , 1).   -  person Tassos Bassoukos    schedule 25.08.2017


Ответы (1)


Observable в версии 2.x не имеют противодавления, поэтому оператор range выдаст все свои элементы, как только сможет. Однако в вашем случае можно использовать обычный счетчик, увеличиваемый вместе с уведомлением об ошибке обработчика повторной попытки:

source.retryWhen(e -> {
    int[] counter = { 0 };
    return e.takeWhile(v -> ++counter[0] < 4)
            .flatMap(v -> Observable.timer(counter[0], TimeUnit.SECONDS));
})
person akarnokd    schedule 24.08.2017