Когда несколько onErrorContinue добавлен в конвейер для обработки исключения определенного типа, созданного из flatMap, обработка исключений не работает должным образом.
Я ожидаю, что в приведенном ниже коде элементы с 1 по 6 должны быть отброшены, а элементы с 7 по 10 должны быть использованы подписчиком.
public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))
.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))
.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))
.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}
public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}
public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}
Вот что я получаю:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6
Вопрос: почему 2-й onErrorContinue
не вызывается, но подписчику отправляется исключение?
Дополнительное примечание: если я удалю 1-й и 2-й onErrorContinue
, все исключения будут обрабатываться 3-м onErrorContinue
. Я мог бы использовать этот подход для получения всех исключений, проверки типа исключения и продолжения обработки. Однако я бы хотел сделать более чистую обработку исключений, а не добавлять блок if..else
.
Чем этот вопрос отличается от Почему Thread.sleep ( ) инициировать подписку на Flux.interval ()?
1) Этот вопрос об обработке исключений и порядке обработки исключений; Другой вопрос касается параллельной обработки элементов и создания основного потока, ожидающего завершения обработки всех элементов. 3) Этот вопрос не имеет никакого отношения к потокам, даже если добавить Thread.sleep(10000)
после . subscribe
, изменения в поведении не произойдет.