Spring Cloud Stream + Spring Retry, как добавить обратный вызов восстановления и отключить логику, отправляющую в DLQ?

Я использую Spring Cloud Stream + Rabbit mq binder.

В моем @StreaListener я хочу применить логику повтора для определенных исключений с помощью RetryTemplate. После того, как повторные попытки исчерпаны или возникает ошибка, не подлежащая повторному запуску, я хотел бы добавить обратный вызов восстановления, который сохранит новую запись с сообщением об ошибке в моей базе данных Postgres и завершит сообщение (перейти к следующему). Вот что у меня получилось:

  @StreamListener(Sink.INPUT)
  public void saveUser(User user) {    
    User user = userService.saveUser(user); //could throw exceptions
    log.info(">>>>>>User is created successfully: {}", user);
  }

  @StreamRetryTemplate
  public RetryTemplate myRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());

    Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
    retryableExceptions.put(ConnectionException.class, true);
    retryTemplate.registerListener(new RetryListener() {
      @Override
      public <T, E extends Throwable> boolean open(RetryContext context,
        RetryCallback<T, E> callback) {
        return true;
      }

      @Override
      public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
        Throwable throwable) {
        //could add recovery logic here, like save error to db why sertain user was not saved
        log.info("retries exausted");
      }

      @Override
      public <T, E extends Throwable> void onError(RetryContext context,
        RetryCallback<T, E> callback, Throwable throwable) {
        log.error("Error on retry", throwable);
      }
    });

    retryTemplate.setRetryPolicy(
      new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
    return retryTemplate;
  }

из свойств, у меня есть только они (без конфигурации dlq)

spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer

И после того, как количество попыток исчерпано, я получаю этот журнал.

2020-06-01 20:05:58.674  INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685  INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://[email protected]:56722/, localPort= 50728]
2020-06-01 20:05:58.697  INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration           : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)

После запуска метода закрытия RetryListener я вижу, что слушатель пытается подключиться к DLX, вероятно, для публикации сообщения об ошибке. И я не хочу, чтобы он делал это, а также каждый раз наблюдал это сообщение об ошибке в журнале.

Итак, мои вопросы:

1) Куда добавить RecoveryCalback для моего retryTemplate? Предположительно, я мог бы написать свою логику восстановления с сохранением ошибки в db в методе RetryListener # close, но определенно должен быть более подходящий способ сделать это.

2) Как настроить связыватель rabbit-mq, ​​чтобы он не отправлял сообщения в DLQ, может быть, я мог бы переопределить какой-то метод? В настоящее время после того, как количество повторных попыток исчерпано (или появляется ошибка, не подлежащая повторному выполнению), прослушиватель пытается отправить сообщение в DLX и регистрирует ошибку, из-за которой его не удалось найти. Мне не нужно отправлять какие-либо сообщения в dlq в рамках моего приложения, мне нужно только сохранить их в БД.


person vikki    schedule 01.06.2020    source источник


Ответы (1)


В настоящее время нет механизма для предоставления настраиваемого обратного вызова восстановления.

Установите republishToDlq в false (раньше было). Он был изменен на true, что неверно, если autoBindDlq ложно (по умолчанию); Я открою для этого вопрос.

Затем, когда количество повторных попыток исчерпано, исключение будет отброшено обратно в контейнер; вы можете использовать ListenerContainerCustomizer, чтобы добавить собственный ErrorHandler.

Однако данные, которые вы получите, будут ListenerExecutionFailed исключением с необработанным (неконвертированным) Spring AMQP Message в его свойстве failedMessage, а не в вашем User объекте.

ИЗМЕНИТЬ

Вы можете добавить слушателя в канал ошибок привязки ...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So62137618Application {

    public static void main(String[] args) {
        SpringApplication.run(So62137618Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @ServiceActivator(inputChannel = "user-topic.user-consumer.errors")
    public void errors(String in) {
        System.out.println("Retries exhausted for " + new String((byte[]) in.getFailedMessage().getPayload()));
    }

}
person Gary Russell    schedule 01.06.2020
comment
Смотрите редактирование моего ответа. Добавление такого обработчика канала ошибок заменяет стандартную обработку ошибок. - person Gary Russell; 01.06.2020
comment
Я добавил republishToDlq = false и @ServiceActivator, но ничего не изменилось. Ошибки не попадают в канал ошибок, я пробовал раньше. И после истощения я получаю org.springframework.messaging.MessagingException. Я что-то упускаю? Также мне не пришла часть обработчика ошибок, зачем она мне? Разве ошибки не должны сразу попадать в канал ошибок? - person vikki; 01.06.2020
comment
Извините - я сделал поправку - вам нужно извлечь данные из исключения. Обработчик ошибок по умолчанию публикует сообщения в DLX / DLQ; добавление метода обработчика ошибок предотвращает это. - person Gary Russell; 02.06.2020
comment
Спасибо @Gary Russel, это сработало! И даже без настройки обработчика ошибок! Теперь он не пытается отправить сообщение в dlq, он сразу переходит в канал ошибки :) - person vikki; 02.06.2020