RxJava: concatMap() с zip() зависает

У меня есть фиктивный сетевой источник данных:

    fun networkDataSource(): Single<List<Int>> {
        return Single.just((0 until 100).toList())
                .delay(150, TimeUnit.MILLISECONDS)
    }

Вот бесконечная наблюдаемая. Его основное использование заключается в том, что его вычисления должны быть «защищены», чтобы одно значение вычислялось только один раз. (Здесь значение равно 1.)

    val endless = Observable
            .just(1)
            .observeOn(Schedulers.io())
            .delay(500, TimeUnit.MILLISECONDS)
            // Counts as heavy operation, do not calculate this here once again
            .doOnNext { println("=> E: Calculated once") }
            .cache()
            //.doOnNext { println("=> E: From cache") }
            .repeat()

Основной поток просто выдает значения:

    val mainStream = Observable.range(0, 6)
            .doOnNext { println("=> M: Main stream $it") }

Задание:

Заархивируйте 3 наблюдаемых вместе и оптимизируйте использование сети, чтобы она не вызывалась больше, чем необходимо. (После того, как число данных — в данном случае целых чисел — будет достигнуто.

Подход:

    mainStream
            .concatMap {index ->
                Observables.zip(
                        Observable.just(index),
                        endless,
                        networkDataSource()
                                .toObservable()
                                .doOnNext { println("#> N: Network data fetch $index") }
                )
            }
            .doOnNext { println("=> After concatmap: ${it.first}") }
            .take(4)
            .doOnNext { println("=> After take: ${it.first}") }
            .subscribe(
                    { println("=> Last onnext") },
                    { it.printStackTrace() },
                    { synchronized(check) { check.notifyAll() } }
            )

Завершение заблокированной темы - нужно только для тестирования:

synchronized(check) {
    check.wait()
}
println("Ending")

Вот результат:

=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext

Это выход, и он застрял после второго дубля. (Не продолжается через минуту). У меня вопрос, почему так происходит?

В качестве примечания, если я раскомментирую строку из наблюдаемого endless:

.doOnNext { println("=> E: From cache") }

Он собирается залить консоль этой строкой. Почему endless вызывается так много раз вместо каждой итерации?

flatMap() здесь не является решением, потому что он не принимает во внимание take(4) и завершает все сетевые вызовы.

Итак, как мне заставить concatMap() работать?

(Я также добавил тег RxJS, потому что это реактивная проблема, а не касающаяся Kotlin. JS-решения также приветствуются, если эти функции существуют в библиотеке RxJava.)

Редактировать.:

Я просмотрел код, и 2 вывода, вероятно, связаны с параметром prefetch:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

Но я до сих пор не понимаю, как это работает. Я только читал, что concatMap() это flatmap(), но он ждет результатов каждого.


person andras    schedule 29.09.2018    source источник
comment
Вся установка, вероятно, будет выполняться в том же потоке после первого элемента, и repeat в endless никогда не откажется от потока, предотвращая выполнение любого другого оператора.   -  person akarnokd    schedule 29.09.2018
comment
@akarnokd Спасибо, вы правы, но я бы не подумал об этом. Добавлено .subscribeOn(Schedulers.io()) после repeat() в наблюдаемом endless, и теперь оно завершается. Хотя он излучает слишком много после того, как repeat() и throttleFirst было бы слишком сложно добавить в конце. --- Не могли бы вы объяснить или дать ссылку, как решить мою проблему? Тот, который я сейчас делаю, выглядит хакерским и далеким от профессионального, я бы не хотел оставлять его в приложении для Android. --- --- Напишите ответ, и я его приму.   -  person andras    schedule 29.09.2018
comment
Мне нет смысла повторять этот кеш, потому что вы будете использовать только один и только один его элемент.   -  person akarnokd    schedule 29.09.2018
comment
@akarnokd Вау, спасибо, не знал, что это будет работать без repeat().   -  person andras    schedule 29.09.2018


Ответы (1)


Из комментариев:

Вся установка, вероятно, будет выполняться в том же потоке после первого элемента, и repeat в endless никогда не откажется от потока, предотвращая выполнение любого другого оператора. Мне нет смысла повторять это cache, потому что вы будете использовать только один и только один его элемент.

person akarnokd    schedule 01.10.2018