Как объединить вложенный поток

Как объединить два потока с вложенным? Почему выполнение этого кода никогда не заканчивается?

@Test
fun `concatenating two flux`() {

    val names = listOf("israel", "israel")

    val a = Flux.just("a", "v")
            .flatMap { it.toUpperCase().toMono() }
            .concatWith { names.joinToString(" ").toMono() }

    StepVerifier.create(a).expectNext("A", "V", "israel israel").verifyComplete()
}

когда у меня есть поток с разделенной переменной, выполнение выполняется, как ожидалось

@Test
fun `concatenating two flux`() {

    val names = listOf("israel", "israel")

    val b = names.joinToString(" ").toMono()

    val a = Flux.just("a", "v")
            .flatMap { it.toUpperCase().toMono() }

    val c = a.concatWith(b)

    StepVerifier.create(c.log()).expectNext("A", "V", "israel israel").verifyComplete()
}

person Israel Rodrigues    schedule 06.11.2019    source источник


Ответы (1)


Вам нужно использовать () вместо {} в concatWith()

// RIGHT!

    Flux.just("a", "v")
                    .flatMap { it.toUpperCase().toMono() }
                    .concatWith ( names.joinToString(" ").toMono() )

// WRONG!

    Flux.just("a", "v")
                    .flatMap { it.toUpperCase().toMono() }
                    .concatWith { names.joinToString(" ").toMono() }

Большинство методов Rx2 действительно принимают фактические лямбды, некоторые методы принимают Callable<ObservableSource<T>> вместо ObservableSource, другие принимают Function<T, ObservableSource<R>>.

Observable.defer { Observable.just(1) } - это нормально сработает.

Или observable.flatMap { Observable.just(1) } - тоже будет работать должным образом (если вы намеренно игнорируете входящие параметры).

И третий - это тот факт, что мы привыкли к Rx1, который всегда принимал Observable в своем andThen() методе, который не мог быть представлен как лямбда, поэтому нам нужно использовать () вместо {}.

person yash sugandh    schedule 07.11.2019