Почему я получаю сигнал onComplete, когда в реактивном потребителе Spring Cloud Stream возникает исключение?

Я использую Spring Reactor с Spring Cloud Stream (GCP Pub/Sub Binder) и сталкиваюсь с проблемами обработки ошибок. Я могу воспроизвести проблему на очень простом примере:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .map(msg -> {
            if (true) { 
                throw new RuntimeException("exception encountered!");
            }
            return msg;
        })
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

Поведение, которое я ожидаю, состоит в том, чтобы увидеть Не удалось использовать печать сообщения, однако это не то, что, по-видимому, происходит. При добавлении вызова .log() в цепочку я вижу сигналы onNext/onComplete, я ожидаю увидеть сигналы onError.

Мой фактический код выглядит примерно так:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

Я заметил, что глубоко в своем классе обслуживания я пытался выполнять обработку ошибок на своих издателях Reactor. Однако сигнал onError не возникнет при использовании Spring Cloud Stream. Если бы я просто вызывал свою службу как таковую myService.processMessage(msg) в модульном тесте и имитировал исключение, моя реактивная цепочка правильно распространяла бы сигналы об ошибках.

Кажется, это проблема, когда я подключаюсь к Spring Cloud Stream. Мне интересно, выполняет ли Spring Cloud Function/Stream какие-либо глобальные ошибки?

В моем нетривиальном коде я замечаю это сообщение об ошибке, которое может иметь какое-то отношение к тому, почему я не получаю сигналов об ошибках?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

Чтобы еще больше запутаться, я могу получить сигнал onError в своей реактивной цепочке, если я переключу привязку Spring Cloud Stream на нереактивную реализацию следующим образом:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
        .subscribe();
}

person Jon Catanio    schedule 01.10.2020    source источник


Ответы (2)


Итак, это то, что я собрал из своих собственных исследований, может быть, это может помочь другим. Предупреждение, возможно, я не использую правильный язык Spring Reactor, но вот как я решил это...

В Hoxton.SR5 onErrorContinue был включен в реактивную привязку, которая управляла подпиской Flux. Проблема с onErrorContinue заключается в том, что он влияет на операторов upstream, применяя функцию BiConsumer к оператору, в котором произошел сбой (если поддерживается).

Это означает, что при возникновении ошибки в наших операторах map/flatMap onErrorContinue BiConsumer включится и изменит нисходящий сигнал либо на onComplete() (Mono<T>), либо на request(...) (если он запрашивает новый элемент из Flux<T>). Это привело к тому, что наши операторы doOnError(...) не выполнялись, так как не было сигналов onError().

В конце концов команда SCS решила удалить эту оболочку обработки ошибок. Hoxton.SR6 больше не имеет этого onErrorContinue. Однако это означало, что исключения, распространяющиеся до привязки SCS, приведут к разрыву подписки Flux. Последующие сообщения некуда было бы маршрутизировать, так как не было подписчиков.

Эта обработка ошибок была передана клиентам. Мы добавляем оператор onErrorResume к внутреннему публикатору, чтобы эффективно отбрасывать сигналы об ошибках. При возникновении ошибки в myService::processMessage издателе onErrorResume переключит издателей на резервного издателя, который был передан в качестве параметра, и возобновит работу с этой точки в цепочке операторов. В нашем случае этот резервный издатель просто возвращает Mono.empty(), что позволяет нам отбрасывать сигналы об ошибках, в то же время позволяя работать внутренним механизмам обработки ошибок, не влияя при этом на внешний исходный издатель.

onErrorResume Пример/Пояснение

Описанную выше технику можно проиллюстрировать на очень простом примере.

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Flux.just(4, 5, 6))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Flux<Integer> выше выведет следующее:

Element: 1
Element: 4
Element: 5
Element: 6

Поскольку в элементе 2 обнаружена ошибка, срабатывает onErrorResume резервный вариант, и новый издатель становится Flux.just(4, 5, 6) фактически возобновляющим резервный вариант. В нашем случае мы не хотим влиять на исходного издателя (например, Flux.just(1, 2, 3)). Мы хотим просто отбросить ошибочный элемент (2) и перейти к следующему элементу (3).

Мы не можем просто изменить Flux.just(4, 5, 6) на Flux.empty() или Mono.empty() следующим образом:

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Это приведет к выводу следующего:

Element: 1

Это связано с тем, что onErrorResume заменил вышестоящих издателей резервным издателем (например, Mono.empty()) и с этого момента возобновил.

Чтобы достичь желаемого результата:

Element: 1
Element: 3

Мы должны разместить оператор onErrorResume во внутреннем издателе flatMap:

public Mono<Integer> func(int i) {
    return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}

Flux.just(1, 2, 3)
    .flatMap(i -> func(i)
        onErrorResume(t -> Mono.empty()))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Теперь onErrorResume влияет только на внутренний издатель, возвращаемый func(i). Если операторы в func(i) произойдут ошибки, onErrorResume переключится на Mono.empty(), эффективно завершая Mono<T> без взрыва. Это также по-прежнему позволяет применять операторы обработки ошибок (например, doOnError) внутри func(i) до запуска резервного варианта. Это связано с тем, что, в отличие от onErrorContinue, он не влияет на восходящие операторы и изменяет следующий сигнал в месте ошибки.

Окончательное решение

Повторно используя фрагмент кода в моем вопросе, я обновил свою версию Spring Cloud до Hoxton.SR6 и изменил код примерно так:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(msg -> myService.processMessage(msg)
            .onErrorResume(throwable -> Mono.empty())
        )
        .then();
}

Обратите внимание, что onErrorResume находится на внутреннем издателе (внутри flatMap).

person Jon Catanio    schedule 06.10.2020
comment
Это действительно раздражает, я пытался найти решение и в итоге получил то же самое, что и вы, но в этом случае у вас не может быть повторной попытки и ситуации DLQ, и вам нужно повторить шаги для каждого flatMap, который у вас есть в вашем конвейере. - person navid_gh; 06.10.2020
comment
@navid_gh да, согласен. К счастью для моего конкретного случая использования, у нас уже есть EmitterProcessor, подключенный к Spring Cloud Stream Supplier<T>. В итоге мы просто создали собственное сообщение с полем retryCount и выполнили повторные попытки вручную. Очевидно, что это не идеально, но похоже, что с этим реактивным подходом нам приходится идти на некоторые компромиссы. - person Jon Catanio; 07.10.2020
comment
Но как вы решаете проблему весеннего сыщика, EmitterProcessor плохо работает с трассировкой, и он будет создавать новый идентификатор трассировщика при каждом вызове и не использует существующий идентификатор трассировщика из цепочки - person navid_gh; 08.10.2020

Я думаю, что проблема существует в следующем коде:

    .map(msg -> new RuntimeException("exception encountered!"))

Лямбда в вашей строке карты возвращает исключение, а не выдает его.

person toolkit    schedule 01.10.2020
comment
Мои извинения, это была опечатка с моей стороны, я обновил свой вопрос. Предполагалось, что он также будет включать throw. Проблема сохраняется: .map(msg -> throw new RuntimeException(обнаружено исключение!)) - person Jon Catanio; 02.10.2020