Я использую Spring Cloud Stream + Rabbit mq binder.
В моем @StreaListener я хочу применить логику повтора для определенных исключений с помощью RetryTemplate. После того, как повторные попытки исчерпаны или возникает ошибка, не подлежащая повторному запуску, я хотел бы добавить обратный вызов восстановления, который сохранит новую запись с сообщением об ошибке в моей базе данных Postgres и завершит сообщение (перейти к следующему). Вот что у меня получилось:
@StreamListener(Sink.INPUT)
public void saveUser(User user) {
User user = userService.saveUser(user); //could throw exceptions
log.info(">>>>>>User is created successfully: {}", user);
}
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(ConnectionException.class, true);
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
//could add recovery logic here, like save error to db why sertain user was not saved
log.info("retries exausted");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
log.error("Error on retry", throwable);
}
});
retryTemplate.setRetryPolicy(
new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
return retryTemplate;
}
из свойств, у меня есть только они (без конфигурации dlq)
spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer
И после того, как количество попыток исчерпано, я получаю этот журнал.
2020-06-01 20:05:58.674 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://[email protected]:56722/, localPort= 50728]
2020-06-01 20:05:58.697 INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)
После запуска метода закрытия RetryListener я вижу, что слушатель пытается подключиться к DLX, вероятно, для публикации сообщения об ошибке. И я не хочу, чтобы он делал это, а также каждый раз наблюдал это сообщение об ошибке в журнале.
Итак, мои вопросы:
1) Куда добавить RecoveryCalback для моего retryTemplate? Предположительно, я мог бы написать свою логику восстановления с сохранением ошибки в db в методе RetryListener # close, но определенно должен быть более подходящий способ сделать это.
2) Как настроить связыватель rabbit-mq, чтобы он не отправлял сообщения в DLQ, может быть, я мог бы переопределить какой-то метод? В настоящее время после того, как количество повторных попыток исчерпано (или появляется ошибка, не подлежащая повторному выполнению), прослушиватель пытается отправить сообщение в DLX и регистрирует ошибку, из-за которой его не удалось найти. Мне не нужно отправлять какие-либо сообщения в dlq в рамках моего приложения, мне нужно только сохранить их в БД.