Контроллер Webflux возвращает объект вместо Mono

Здравствуйте, я новичок в Webflux. Я следую руководству по созданию реактивных микросервисов. В своем проекте я столкнулся со следующей проблемой.

Я хочу создать crud api для службы продукта, и ниже приведен метод Create.

@Override
public Product createProduct(Product product) {

    Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
             productEntity.ifPresent((prod -> {
                  throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
              }));

    ProductEntity entity = mapper.apiToEntity(product);
    Mono<Product> newProduct = repository.save(entity)
        .log()
        .map(mapper::entityToApi);

    return newProduct.block();
}

Проблема в том, что когда я вызываю этот метод от почтальона, я получаю сообщение об ошибке block () / blockFirst () / blockLast () are blocking, что не поддерживается в потоке response-http-nio-3, но когда я использую StreamListener, этот вызов работает нормально. Слушатель потока получает события из канала rabbit-mq

StreamListener

@EnableBinding(Sink.class)
public class MessageProcessor {

    private final ProductService productService;

    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

       
        switch (event.getEventType()) {

            case CREATE:
                Product product = event.getData();
                LOG.info("Create product with ID: {}", product.getProductId());
                productService.createProduct(product);
                break;

   
            default:
                String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
                LOG.warn(errorMessage);
                throw new EventProcessingException(errorMessage);
        }
    }
}

У меня два вопроса.

  1. Почему это работает с StreamListener, а не с простым запросом?
  2. Есть ли в webflux правильный способ вернуть объект Mono, или мы всегда должны возвращать Mono?

person Kasiaras Dimitrios    schedule 06.08.2020    source источник
comment
Это очень простой вопрос, я предлагаю вам ознакомиться с основами реактивного программирования, что такое Mono на самом деле и почему блокировка не разрешена в реактивных приложениях. Документация по запуску реактора - хорошее место для начала.   -  person Toerktumlare    schedule 06.08.2020
comment
начните здесь projectreactor.io/docs/core/release/reference/#intro- реактивный   -  person Toerktumlare    schedule 06.08.2020


Ответы (1)


Ваш метод создания хотел бы выглядеть более похожим на этот, и вы хотели бы вернуть Mono<Product> из вашего контроллера, а не только объект.

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .switchIfEmpty(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }

Как прокомментировал @Thomas, вы нарушаете некоторые основы реактивного кодирования и не получаете преимуществ от использования block (), и вам следует прочитать об этом больше. Например, реактивный репозиторий mongo, который вы используете, будет возвращать Mono, у которого есть свои собственные методы для обработки, если он пуст, без необходимости использовать Optional, как показано выше.

ИЗМЕНИТЬ, чтобы отобразить ошибку, если объект уже существует, в противном случае сохраните

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> exists)
        .flatMap(exists -> Mono.error(new Exception("my exception")))
        .then(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }
person Joseph Berry    schedule 06.08.2020
comment
Спасибо, я буду следить за началом работы с документацией по реактору, но, как я вижу в вашем ответе, когда пользователь уже существует, ошибка не выдается ... Есть ли способ выдать ошибку, если она не пуста? - person Kasiaras Dimitrios; 06.08.2020
comment
Вы бы не бросили ошибку в традиционном смысле, см. Отредактируйте, чтобы ответить - person Joseph Berry; 06.08.2020