У меня есть фиктивный сетевой источник данных:
fun networkDataSource(): Single<List<Int>> {
return Single.just((0 until 100).toList())
.delay(150, TimeUnit.MILLISECONDS)
}
Вот бесконечная наблюдаемая. Его основное использование заключается в том, что его вычисления должны быть «защищены», чтобы одно значение вычислялось только один раз. (Здесь значение равно 1.)
val endless = Observable
.just(1)
.observeOn(Schedulers.io())
.delay(500, TimeUnit.MILLISECONDS)
// Counts as heavy operation, do not calculate this here once again
.doOnNext { println("=> E: Calculated once") }
.cache()
//.doOnNext { println("=> E: From cache") }
.repeat()
Основной поток просто выдает значения:
val mainStream = Observable.range(0, 6)
.doOnNext { println("=> M: Main stream $it") }
Задание:
Заархивируйте 3 наблюдаемых вместе и оптимизируйте использование сети, чтобы она не вызывалась больше, чем необходимо. (После того, как число данных — в данном случае целых чисел — будет достигнуто.
Подход:
mainStream
.concatMap {index ->
Observables.zip(
Observable.just(index),
endless,
networkDataSource()
.toObservable()
.doOnNext { println("#> N: Network data fetch $index") }
)
}
.doOnNext { println("=> After concatmap: ${it.first}") }
.take(4)
.doOnNext { println("=> After take: ${it.first}") }
.subscribe(
{ println("=> Last onnext") },
{ it.printStackTrace() },
{ synchronized(check) { check.notifyAll() } }
)
Завершение заблокированной темы - нужно только для тестирования:
synchronized(check) {
check.wait()
}
println("Ending")
Вот результат:
=> M: Main stream 0
=> M: Main stream 1
=> M: Main stream 2
=> M: Main stream 3
=> M: Main stream 4
=> M: Main stream 5
#> N: Network data fetch 0
=> E: Calculated once
=> After concatmap: 0
=> After take: 0
=> Last onnext
#> N: Network data fetch 1
=> After concatmap: 1
=> After take: 1
=> Last onnext
Это выход, и он застрял после второго дубля. (Не продолжается через минуту). У меня вопрос, почему так происходит?
В качестве примечания, если я раскомментирую строку из наблюдаемого endless
:
.doOnNext { println("=> E: From cache") }
Он собирается залить консоль этой строкой. Почему endless
вызывается так много раз вместо каждой итерации?
flatMap()
здесь не является решением, потому что он не принимает во внимание take(4)
и завершает все сетевые вызовы.
Итак, как мне заставить concatMap()
работать?
(Я также добавил тег RxJS, потому что это реактивная проблема, а не касающаяся Kotlin. JS-решения также приветствуются, если эти функции существуют в библиотеке RxJava.)
Редактировать.:
Я просмотрел код, и 2 вывода, вероятно, связаны с параметром prefetch
:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return concatMap(mapper, 2);
}
Но я до сих пор не понимаю, как это работает. Я только читал, что concatMap()
это flatmap()
, но он ждет результатов каждого.
repeat
вendless
никогда не откажется от потока, предотвращая выполнение любого другого оператора. - person akarnokd   schedule 29.09.2018.subscribeOn(Schedulers.io())
послеrepeat()
в наблюдаемомendless
, и теперь оно завершается. Хотя он излучает слишком много после того, какrepeat()
иthrottleFirst
было бы слишком сложно добавить в конце. --- Не могли бы вы объяснить или дать ссылку, как решить мою проблему? Тот, который я сейчас делаю, выглядит хакерским и далеким от профессионального, я бы не хотел оставлять его в приложении для Android. --- --- Напишите ответ, и я его приму. - person andras   schedule 29.09.2018repeat()
. - person andras   schedule 29.09.2018