Как я могу условно добавить асинхронную операцию в середине потока RxJava?

Вот упрощенная версия того, что я пытаюсь сделать (используя Kotlin и RxJava)

makeServerCall()
                .doOnNext {
                    doStuff(it)
                }
                //TODO: if it == 0, call asyncOperation() and wait for its callback to fire 
                //before running the rest of the stream. Otherwise immediately run the rest
                //of the stream
                .flatMap {
                    observable1(it)
                    observable2(it)
                    Observable.merge(
                            getSpotSearchObservable(observable1),
                            getSpotSearchObservable(observable2)
                }
                .subscribeBy(onNext = {
                allDone()
                    view?
                })

Как втиснуть вызов asyncOperation() и заставить остальную часть потока ждать срабатывания обратного вызова, но только при выполнении определенного условия? Кажется, что это тривиальная операция в Rx, но на ум не приходит никакого очевидного решения.


person Chad Schultz    schedule 14.09.2017    source источник
comment
вы рассматривали switchIfEmpty? Когда it ==0, если он вам больше не нужен для последующих операций, вы можете переключиться на другой наблюдаемый поток (asyncOperation)   -  person Jon    schedule 14.09.2017


Ответы (1)


Плоская карта это!

.flatMap {
    if (it == 0) {
        return@flatMap asyncOperation()
            .ignoreElements()
            .andThen(Observable.just(0))
    }
    return@flatMap Observable.just(it)
}
.flatMap {
    observable1(it)
    observable2(it)
    Observable.merge(
        getSpotSearchObservable(observable1),
        getSpotSearchObservable(observable2)
    )
}
person akarnokd    schedule 14.09.2017