Project Reactor + flatMap + Multiple onErrorComplete - Не работает должным образом

Когда несколько onErrorContinue добавлен в конвейер для обработки исключения определенного типа, созданного из flatMap, обработка исключений не работает должным образом.

Я ожидаю, что в приведенном ниже коде элементы с 1 по 6 должны быть отброшены, а элементы с 7 по 10 должны быть использованы подписчиком.

public class FlatMapOnErrorContinueExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .flatMap(number -> {
                    if (number <= 3) {
                        return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
                    } else if (number > 3 && number <= 6) {
                        return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
                    } else {
                        return Mono.just(number);
                    }
                })
                .onErrorContinue(NumberLesserThanThree.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

                .onErrorContinue(NumberLesserThanSixButGretherThan3.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

                .onErrorContinue((throwable, object) ->
                        System.err.println("Exception: " + throwable.getMessage()))

                .subscribe(number -> System.out.println("number is " + number),
                        error -> System.err.println("Exception in Subscription " + error.getMessage()));
    }

    public static class NumberLesserThanThree extends RuntimeException {
        public NumberLesserThanThree(final String msg) {
            super(msg);
        }
    }

    public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
        public NumberLesserThanSixButGretherThan3(final String msg) {
            super(msg);
        }
    }
}

Вот что я получаю:

Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

Вопрос: почему 2-й onErrorContinue не вызывается, но подписчику отправляется исключение?

Дополнительное примечание: если я удалю 1-й и 2-й onErrorContinue, все исключения будут обрабатываться 3-м onErrorContinue. Я мог бы использовать этот подход для получения всех исключений, проверки типа исключения и продолжения обработки. Однако я бы хотел сделать более чистую обработку исключений, а не добавлять блок if..else.

Чем этот вопрос отличается от Почему Thread.sleep ( ) инициировать подписку на Flux.interval ()?

1) Этот вопрос об обработке исключений и порядке обработки исключений; Другой вопрос касается параллельной обработки элементов и создания основного потока, ожидающего завершения обработки всех элементов. 3) Этот вопрос не имеет никакого отношения к потокам, даже если добавить Thread.sleep(10000) после . subscribe, изменения в поведении не произойдет.


person Naveen Kumar    schedule 05.06.2020    source источник


Ответы (1)


Это снова сводится к необычному поведению onErrorContinue. Он нарушает правило в том, что он не «ловит» ошибки и не меняет их поведение в результате, он фактически позволяет операторам поддержки «смотреть вперед» на них и вести себя соответствующим образом, таким образом изменяя результат upstream .

Это странно и приводит к некоторому поведению, которое не сразу становится очевидным, как, например, здесь. Насколько мне известно, все поддерживающие операторы смотрят вперед только на оператор next onErrorContinue, а не на рекурсивный поиск всех таких операторов. Вместо этого они будут оценивать предикат следующего onErrorContinue (в данном случае, имеет ли он определенный тип), а затем вести себя соответствующим образом - либо вызывая обработчик, если предикат возвращает истину, либо выдавая ошибку ниже по потоку, если нет. (Нет случая, когда он затем перейдет к оператору next onErrorContinue, а затем к следующему, пока не будет сопоставлен предикат.)

Ясно, что это надуманный пример, но из-за этих особенностей я почти всегда рекомендую избегать onErrorContinue. Там, где задействован flatMap(), может произойти два «нормальных» пути:

  1. Если в flatMap() есть «внутренняя реактивная цепочка», то есть он вызывает другой метод или серию методов, которые возвращают издателя - тогда просто используйте onErrorResume() в конце вызова flatMap() для обработки этих ошибок. Вы можете объединить onErrorResume() в цепочку, поскольку это работает с операторами нижестоящего, а не восходящего потока. Это, безусловно, самый распространенный случай.

  2. Если flatMap() - это императивная коллекция if / else, которая возвращает разных издателей, таких как здесь, и вы хотите / должны сохранить императивный стиль, генерировать исключения вместо использования Mono.error() и ловить при необходимости, возвращая Mono.empty() в случае ошибки :

    .flatMap(number -> {
        try {
            if (number <= 3) {
                throw new NumberLessThanThree();
            } else if (number <= 6) {
                throw new NumberLessThanSixButGreaterThan3();
            } else {
                return Mono.just(number);
            }
        }
        catch(NumberLessThanThree ex) {
            //Handle it
            return Mono.empty();
        }
        catch(NumberLessThanSixButGreaterThan3 ex) {
            //As above
        }
    })

В общем, использование одного из этих двух подходов позволит намного легче рассуждать о том, что происходит.

(Для полноты картины после прочтения комментариев - это не имеет ничего общего с невозможностью завершения реактивной цепочки до выхода из основного потока.)

person Michael Berry    schedule 05.06.2020
comment
Спасибо за подробное объяснение и разъяснение, а также за пару подходов, с помощью которых я смог преодолеть ограничение. Да! это надуманная реализация, которую я написал, чтобы понять поведение; но это не очень похоже на мою, как вы упомянули, внутреннюю реактивную цепочку, которая состоит из до 4 операторов. Я бы предпочел подход №1, а не №2. Поэтому позвольте мне переупорядочить мои операторы и конвейер с помощью onErrorResume и посмотреть, можно ли реализовать и улучшить обработку ошибок. - person Naveen Kumar; 05.06.2020
comment
@NaveenKumar Ага, # 1 определенно лучший подход, если вы можете его использовать. Смешивание императивного и реактивного - это немного запах кода, если вы можете этого избежать, ИМХО, затрудняет рассуждение кода. - person Michael Berry; 05.06.2020
comment
@MichaelBerry Предположим, что нет оператора flatMap, но есть простой оператор карты, и мы используем данные от популярного издателя. Итак, если мне нужно избежать использования onErrorContinue, я думал об использовании оператора .retry (), но это может привести к тому, что некоторые элементы будут пропущены горячим издателем, поскольку retry () займет небольшое количество времени, чтобы повторно подписаться на горячего издателя . Итак, каков же выход в таком случае? - person Arunim Chopra; 18.01.2021