Объединение нескольких издателей, трансформирующих ответы

Я хочу связать одно моно после каждого события потока. Издателю моно потребуется информация о каждом событии, опубликованном потоком. Ответ должен быть потоком с данными о событии потока и моно-ответом.

После раскопок я получаю карту внутри flatMap. Код выглядит так:

override fun searchPets(petSearch: PetSearch): Flux<Pet> {
    return petRepository
        .searchPets(petSearch) // returns Flux<pet>
        .flatMap { pet -> 
            petService
            .getCollarForMyPet() // returns Mono<collar>
            .map { collar -> PetConverter.addCollarToPet(pet, collar) } //returns pet (now with with collar)
        }
}

Мои основные опасения:

  • Пахнет ли код при использовании карты внутри FlatMap?
  • Будет ли содержимое переменной питомца страдать от условий гонки с приближением нескольких событий потока, а также моно-событий?
  • Есть ли лучший способ подойти к такому поведению?

person Pedro Pepê    schedule 16.03.2021    source источник


Ответы (1)


Такой подход прекрасен.

Спецификация реактивных потоков требует, чтобы onNext события не перекрывались, поэтому не будет проблем с условиями гонки.

flatMap тем не менее вводит параллелизм, поэтому несколько вызовов PetService будут выполняться параллельно. Это не должно быть проблемой, если только searchPets не испускает несколько экземпляров Pet дважды.

Не то чтобы из-за этого параллелизма flatMap может как бы переупорядочивать питомцев в этом сценарии. Представьте, что поиск возвращает petA, затем petB, но вызов petService для petA занимает больше времени. В выводе flatMap сначала будет отправлено petB (с установленным воротником), затем petA.

person Simon Baslé    schedule 18.03.2021