flatMap
не выполняет никакой параллельной работы, например: он не меняет потоки. Самый простой пример
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
Это печатает:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[main] INFO reactor.Flux.FlatMap.1 - onNext(10)
[main] INFO reactor.Flux.FlatMap.1 - onNext(11)
[main] INFO reactor.Flux.FlatMap.1 - onNext(20)
[main] INFO reactor.Flux.FlatMap.1 - onNext(21)
[main] INFO reactor.Flux.FlatMap.1 - onNext(30)
[main] INFO reactor.Flux.FlatMap.1 - onNext(31)
[main] INFO reactor.Flux.FlatMap.1 - onNext(40)
[main] INFO reactor.Flux.FlatMap.1 - onNext(41)
[main] INFO reactor.Flux.FlatMap.1 - onNext(50)
[main] INFO reactor.Flux.FlatMap.1 - onNext(51)
[main] INFO reactor.Flux.FlatMap.1 - onComplete()
Как видите, производит только в main
. Если вы добавите publishOn
после начального диапазона, flatMap
создаст все в том же единственном потоке, на который переключится publishOn.
Однако flatMap
подписывается на несколько внутренних Publisher
, вплоть до параметра concurrency
со значением по умолчанию Queues.SMALL_BUFFER_SIZE
(256).
Это означает, что если вы установите значение 3
, flatMap
сопоставит 3 исходных элемента со своими внутренними Publisher
и подпишется на этих издателей, но будет ждать завершения хотя бы одного, прежде чем начнет сопоставлять другие исходные элементы.
Если внутренний Publisher
использует publishOn
или subscribeOn
, то flatMap
естественным образом позволит своим событиям происходить в определенных на тот момент потоках:
Flux.range(1, 5).hide()
.flatMap(v -> Flux.range(v * 10, 2)
.publishOn(Schedulers.newParallel("foo", 3)))
.flatMap(v -> Flux.range(10 * v, 2))
.log()
.blockLast(); //for test purpose
Что печатает:
[main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(10)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(11)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(20)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(21)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(30)
[foo-1] INFO reactor.Flux.FlatMap.1 - onNext(31)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(50)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(51)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(40)
[foo-4] INFO reactor.Flux.FlatMap.1 - onNext(41)
[foo-4] INFO reactor.Flux.FlatMap.1 - onComplete()
person
Simon Baslé
schedule
23.04.2018