Когда FlatMap будет прослушивать несколько источников одновременно?

В каких ситуациях Flux::flatMap одновременно прослушивает несколько источников (0...бесконечность)?


Во время экспериментов я обнаружил, что когда восходящий поток отправляет сигналы flatMap в потоке thread-upstream-1, и есть N внутренних потоков, которые будет прослушивать flatMap, и каждый из них отправляет сигналы в разные потоки: thread-inner-stream-i для 1<=i<=N, чем для каждого 1<=i<=N, если thread-upstream-1 != thread-inner-stream-i, flatMap будет прослушивать одновременно все внутренние потоки.

Я думаю, что это не совсем так, и я пропустил некоторые другие сценарии.


person Stav Alfi    schedule 22.04.2018    source источник


Ответы (1)


flatMap не выполняет никакой параллельной работы, например: он не меняет потоки. Самый простой пример

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose

Это печатает:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO  reactor.Flux.FlatMap.1 - onComplete()

Как видите, производит только в main. Если вы добавите publishOn после начального диапазона, flatMap создаст все в том же единственном потоке, на который переключится publishOn.

Однако flatMap подписывается на несколько внутренних Publisher, вплоть до параметра concurrency со значением по умолчанию Queues.SMALL_BUFFER_SIZE (256).

Это означает, что если вы установите значение 3, flatMap сопоставит 3 исходных элемента со своими внутренними Publisher и подпишется на этих издателей, но будет ждать завершения хотя бы одного, прежде чем начнет сопоставлять другие исходные элементы.

Если внутренний Publisher использует publishOn или subscribeOn, то flatMap естественным образом позволит своим событиям происходить в определенных на тот момент потоках:

Flux.range(1, 5).hide()
    .flatMap(v -> Flux.range(v * 10, 2)
                      .publishOn(Schedulers.newParallel("foo", 3)))
    .flatMap(v -> Flux.range(10 * v, 2))
    .log()
    .blockLast(); //for test purpose

Что печатает:

[main] INFO  reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO  reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO  reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO  reactor.Flux.FlatMap.1 - onComplete()
person Simon Baslé    schedule 23.04.2018