Я использую Spring Reactor с Spring Cloud Stream (GCP Pub/Sub Binder) и сталкиваюсь с проблемами обработки ошибок. Я могу воспроизвести проблему на очень простом примере:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
Поведение, которое я ожидаю, состоит в том, чтобы увидеть Не удалось использовать печать сообщения, однако это не то, что, по-видимому, происходит. При добавлении вызова .log()
в цепочку я вижу сигналы onNext
/onComplete
, я ожидаю увидеть сигналы onError
.
Мой фактический код выглядит примерно так:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
Я заметил, что глубоко в своем классе обслуживания я пытался выполнять обработку ошибок на своих издателях Reactor. Однако сигнал onError
не возникнет при использовании Spring Cloud Stream. Если бы я просто вызывал свою службу как таковую myService.processMessage(msg)
в модульном тесте и имитировал исключение, моя реактивная цепочка правильно распространяла бы сигналы об ошибках.
Кажется, это проблема, когда я подключаюсь к Spring Cloud Stream. Мне интересно, выполняет ли Spring Cloud Function/Stream какие-либо глобальные ошибки?
В моем нетривиальном коде я замечаю это сообщение об ошибке, которое может иметь какое-то отношение к тому, почему я не получаю сигналов об ошибках?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
Чтобы еще больше запутаться, я могу получить сигнал onError
в своей реактивной цепочке, если я переключу привязку Spring Cloud Stream на нереактивную реализацию следующим образом:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}