Здравствуйте, я новичок в Webflux. Я следую руководству по созданию реактивных микросервисов. В своем проекте я столкнулся со следующей проблемой.
Я хочу создать crud api для службы продукта, и ниже приведен метод Create.
@Override
public Product createProduct(Product product) {
Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
productEntity.ifPresent((prod -> {
throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
}));
ProductEntity entity = mapper.apiToEntity(product);
Mono<Product> newProduct = repository.save(entity)
.log()
.map(mapper::entityToApi);
return newProduct.block();
}
Проблема в том, что когда я вызываю этот метод от почтальона, я получаю сообщение об ошибке block () / blockFirst () / blockLast () are blocking, что не поддерживается в потоке response-http-nio-3, но когда я использую StreamListener, этот вызов работает нормально. Слушатель потока получает события из канала rabbit-mq
StreamListener
@EnableBinding(Sink.class)
public class MessageProcessor {
private final ProductService productService;
public MessageProcessor(ProductService productService) {
this.productService = productService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Product> event) {
switch (event.getEventType()) {
case CREATE:
Product product = event.getData();
LOG.info("Create product with ID: {}", product.getProductId());
productService.createProduct(product);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
LOG.warn(errorMessage);
throw new EventProcessingException(errorMessage);
}
}
}
У меня два вопроса.
- Почему это работает с StreamListener, а не с простым запросом?
- Есть ли в webflux правильный способ вернуть объект Mono, или мы всегда должны возвращать Mono?