Как мне дождаться, пока несколько моно выполнятся одновременно, и получить значение

Похоже на вопрос Ожидание завершения выполнения экземпляров Reactor Mono но я хочу получить идеальный результат на другом моно. Вот код, который у меня есть. Я попробовал решение с материализацией, но это не сработало.

    @GetMapping("/bounced")
    public Mono<Map<String, Object>> bounced(
        @RequestHeader("X-B3-Traceid") String traceId,
        @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
    ) {

        final Mono<Map<String, Object>> sample = webClient.get()
            .uri("http://sample:8080/")
            .header(HttpHeaders.AUTHORIZATION, authorization)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> httpGet = webClient.get()
            .uri("http://httpbin.org/get")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> anything = webClient.get()
            .uri("http://httpbin.org/anything/foo")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

/*
   Tried this and it does start it up, but it triggers another "download" in the return block.

        Mono.when(anything, sample, httpGet)
            .subscribe();
            .materialize()
            .block();
*/
        return Mono.just(Map.of("traceFromBounced", traceId,
            "anything", anything.block(),
            "sample", sample.block(),
            "httpGet", httpGet.block()));

person Archimedes Trajano    schedule 04.05.2020    source источник
comment
используйте .zip для объединения   -  person K.Nicholas    schedule 04.05.2020


Ответы (1)


Основываясь на комментарии @ K.Nicholas, я заработал

    @GetMapping("/bounced")
    public Mono<Map<String, Object>> bounced(
        @RequestHeader("X-B3-Traceid") String traceId,
        @RequestHeader(HttpHeaders.AUTHORIZATION) String authorization
    ) {

        final Mono<Map<String, Object>> sample = webClient.get()
            .uri("http://sample:8080/")
            .header(HttpHeaders.AUTHORIZATION, authorization)
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> httpGet = webClient.get()
            .uri("http://httpbin.org/get")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        final Mono<Map<String, Object>> anything = webClient.get()
            .uri("http://httpbin.org/anything/foo")
            .retrieve()
            .bodyToMono(new ParameterizedTypeReference<>() {
            });

        return Mono.zip(anything, sample, httpGet)
            .map(t -> Map.of("traceFromBounced", traceId,
                "anything", t.getT1(),
                "sample", t.getT2(),
                "httpGet", t.getT3()));

    }

Вот вывод zipkin, который показывает, что он работает параллельно  введите описание изображения здесь

person Archimedes Trajano    schedule 04.05.2020
comment
Вы должны использовать слияние и запланировать каждый внутренний поток в отдельном планировщике, если вы не хотите, чтобы все ответы были вместе. Что-то вроде Flux.merge (sample.subscribeOn (Schedulers.elastic ()), httpGet.subscribeOn (Schedulers.elastic ()), something.subscribeOn (Schedulers.elastic ())). Subscribe (); - person Prashant Pandey; 04.05.2020
comment
Неважно, я только что увидел, что они вам нужны вместе. zip - это то, что вам нужно. - person Prashant Pandey; 04.05.2020