Приведет ли возврат Mono ‹ServerResponse› к (злой) синхронной, блокирующей связи клиент / сервер?

Я новичок в Spring Reactor и WebFlux и немного смущен потоком событий в функциональной сети Spring. Пример: у меня есть функция-обработчик, возвращающая Mono<ServerResponse>. Внутри него выполняется метод репозитория findAll(), возвращающий Flux<T>. В соответствии с реактивным манифестом, чтобы быть асинхронным, неблокирующим и допускать обратное давление, я хотел бы видеть onNext() для каждого элемента, возвращаемого из репозитория. Однако, просматривая журналы сервера во время обработки запроса, я вижу только одно событие onNext(), что имеет смысл, поскольку мой возвращаемый тип - это Mono, содержащий ответ:

Функция маршрутизатора

@Bean
 public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
     return RouterFunctions
             .route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
                     , itemsHandler::getAll);
}

Функция-обработчик

Mono<ServerResponse> getAll(ServerRequest request) {
    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(itemRepository.findAll(), Item.class)
            .log("GET items");
}

Журнал событий

2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | request(unbounded)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onComplete()

Напротив, реализуя классический метод аннотированного контроллера Spring с Flux<T> в качестве возвращаемого типа, я буду видеть onNext() для каждого экземпляра T (то есть каждого элемента набора результатов), что мне кажется более "правильным" (теперь клиент имеет контроль над потоком событий и т. д.):

Контроллер

@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
    return itemRepository
            .findAll()
            .log("GET items");
}

Журнал

2020-05-10 15:14:04.135  INFO 19096 --- [ctor-http-nio-5] GET items                                : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136  INFO 19096 --- [ctor-http-nio-5] GET items                                : request(unbounded)
2020-05-10 15:14:04.137  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onComplete()

Это смущает. Позвольте мне уточнить:

  • Использование Mono<ServerResponse> кажется злом в том смысле, что оно инкапсулирует весь набор результатов в одно событие, что для меня похоже на нарушение реактивных принципов асинхронного, неблокирующего потока событий с поддержкой обратного давления. Разве это не лишает клиента контроля? Мне это кажется традиционным, блокирующим взаимодействием клиент / сервер.
  • Возврат Flux<T> напрямую выглядит намного приятнее, потому что он позволяет обрабатывать события для каждого результата и контролировать обратное давление.

Мои вопросы:

  • Каковы последствия создания Mono<ServerResponse>? Приведет ли это к блокирующему синхронному взаимодействию с выдачей onNext() только после того, как все элементы будут прочитаны из репозитория? Потеряю ли я функцию противодавления и т. Д.?
  • Как я могу заставить серверную часть функционального стиля отправлять onNext() для каждого элемента в наборе результатов?
  • Что было бы наилучшей практикой с точки зрения типа возвращаемого значения функции обработчика функционального стиля, которая является полностью реактивной, то есть неблокирующей, асинхронной и совместимой с противодавлением? Я не уверен, не нарушает ли Mono<ServerResponse> эти реактивные принципы.

Я могу ошибаться или упускать что-то важное. Спасибо за вашу помощь!


person Mike Floyd    schedule 10.05.2020    source источник


Ответы (1)


Все зависит от клиента, потребляющего ServerResponse. Согласно документации WebFlux (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux) настройка функций обработчика для возврата Mono<ServerResponse> независимо от количества возвращенных элементов - это стандартный способ и абсолютно нормально - пока клиент правильно обрабатывает лежащие в основе Flux<T> все в порядке. Моя проблема возникла из-за того, что я тестировал конечные точки с помощью curl, который не может обнаружить лежащий в основе Flux. Используя клиент с функциональным стилем (например, org.springframework.web.reactive.function.client.WebClient), Mono<ServerResponse> можно сначала десериализовать в Flux<T>, включив все приятные реактивные функции и заставив наши onNext() события отображаться.

Код клиента

Вызов бэкэнда таким образом, десериализация ServerResponse в Flux:

@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
    return  webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
            .retrieve()
            .bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
            .log("GET all items from server");
}

В результате будут видны все onNext() события, что позволит обрабатывать события на стороне клиента:

2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : request(unbounded)
2020-05-10 16:10:10.511  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onComplete()

Так что все в порядке и полностью реагирует, пока происходит правильная обработка ответа клиентом.

person Mike Floyd    schedule 10.05.2020
comment
если вы хотите передать ответ от curl, вы можете передать свойство -N, что означает, что curl не будет буферизовать ответы объясняетhell.com/explain?cmd=curl+-N, поэтому curl -N www.example.com - person Toerktumlare; 11.05.2020