Я новичок в Spring Reactor и WebFlux и немного смущен потоком событий в функциональной сети Spring. Пример: у меня есть функция-обработчик, возвращающая Mono<ServerResponse>
. Внутри него выполняется метод репозитория findAll()
, возвращающий Flux<T>
. В соответствии с реактивным манифестом, чтобы быть асинхронным, неблокирующим и допускать обратное давление, я хотел бы видеть onNext()
для каждого элемента, возвращаемого из репозитория. Однако, просматривая журналы сервера во время обработки запроса, я вижу только одно событие onNext()
, что имеет смысл, поскольку мой возвращаемый тип - это Mono
, содержащий ответ:
Функция маршрутизатора
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
Функция-обработчик
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
Журнал событий
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
Напротив, реализуя классический метод аннотированного контроллера Spring с Flux<T>
в качестве возвращаемого типа, я буду видеть onNext()
для каждого экземпляра T
(то есть каждого элемента набора результатов), что мне кажется более "правильным" (теперь клиент имеет контроль над потоком событий и т. д.):
Контроллер
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
Журнал
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
Это смущает. Позвольте мне уточнить:
- Использование
Mono<ServerResponse>
кажется злом в том смысле, что оно инкапсулирует весь набор результатов в одно событие, что для меня похоже на нарушение реактивных принципов асинхронного, неблокирующего потока событий с поддержкой обратного давления. Разве это не лишает клиента контроля? Мне это кажется традиционным, блокирующим взаимодействием клиент / сервер. - Возврат
Flux<T>
напрямую выглядит намного приятнее, потому что он позволяет обрабатывать события для каждого результата и контролировать обратное давление.
Мои вопросы:
- Каковы последствия создания
Mono<ServerResponse>
? Приведет ли это к блокирующему синхронному взаимодействию с выдачейonNext()
только после того, как все элементы будут прочитаны из репозитория? Потеряю ли я функцию противодавления и т. Д.? - Как я могу заставить серверную часть функционального стиля отправлять
onNext()
для каждого элемента в наборе результатов? - Что было бы наилучшей практикой с точки зрения типа возвращаемого значения функции обработчика функционального стиля, которая является полностью реактивной, то есть неблокирующей, асинхронной и совместимой с противодавлением? Я не уверен, не нарушает ли
Mono<ServerResponse>
эти реактивные принципы.
Я могу ошибаться или упускать что-то важное. Спасибо за вашу помощь!