Реактивные транзакции Quarks Postgresql

Я просмотрел руководство по работе с реактивными клиентами SQL (https://quarkus.io/guides/reactive-sql-clients#using), но я не могу понять, как можно работать с транзакциями. Допустим, я бы хотел улучшить это демонстрационное фруктовое приложение с помощью транзакций.

Как я могу использовать следующий метод для использования транзакции, которая также отменяет все внесенные изменения, если что-то в транзакции не удалось?

public static Multi<Fruit> findAll(PgPool client) {
        return client.query("SELECT id, name FROM fruits ORDER BY name ASC")
                .onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
                .onItem().apply(Fruit::from);
    }

person Newcomer66    schedule 02.05.2020    source источник


Ответы (1)


Вы можете использовать родительский этап с then() или onItem().produceMulti(), чтобы предоставить доступ ко всем подэтапам к элементу родительского этапа (в данном случае к транзакции). Это позволит последующим подэтапам получить прямой доступ к объекту транзакции для закрытия / отката.

Например:

return pgPool.begin()
    .onItem().produceMulti(tx -> {
        return tx.query("DELETE FROM fruits").execute()
                 .onItem().invoke(delete -> tx.query("SELECT id, name FROM fruits ORDER BY name ASC").execute())
                 .onItem().produceMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
                 .onItem().apply(Fruit::from)
                 .onFailure().invoke(ex -> tx.rollback())
                 .on().termination(() -> tx.close());
        });
person Andy Guibert    schedule 02.05.2020
comment
Спасибо! Я тоже думал об этом подходе, но код, который вы предоставили, похоже, не закрывает соединение / транзакцию, когда она успешна. После 15 запусков этого кода приложение переходит в стадию блокировки, так как все соединения остаются открытыми. Я попытался добавить onCompleted в конце, но он никогда не вызывается. Как мне убедиться, что он каждый раз закрывает соединение / транзакцию? Также; есть ли шанс, что вы могли бы показать мне, как я могу сначала добавить запрос DELETE, а затем выбрать, откатывая все, включая запрос DELETE, когда SELECT терпит неудачу? - person Newcomer66; 02.05.2020
comment
хороший момент, я обновил свой ответ, чтобы теперь использовать другой подход, я также включил начальную операцию DELETE в поток - person Andy Guibert; 04.05.2020
comment
Вместо invoke вам нужно: produceUni, вместо on (). Termination () вы можете использовать tx.commit () непосредственно перед управлением сбоями. - person Clement; 04.05.2020