BackOffPolicy и SimpleRetryPolicy не действуют при введении в RetryTemplate

Я использую Spring AMQP для отправки сообщений и возможности повторять попытки для «настраиваемого» исключения. Допустим, у меня есть Receiver, который генерирует настраиваемое исключение «EventException», и для этого я хочу, чтобы было n повторных попыток (в нашем примере 5). Я также хочу, чтобы между повторными попытками была задержка в 5 секунд. Вот мой исходный код:

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    final static String queueName = "testing-queue";

    @Autowired
    AnnotationConfigApplicationContext context;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    Queue queue() {
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", "dead-letter-exchange");
        Queue queue = new Queue(queueName, true, false, false, arguments);
        return queue;
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("testing-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    Queue deadLetterQueue() {
        return new Queue("dead-letter-queue", true);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange("dead-letter-exchange");
    }

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    }

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = 
                new CachingConnectionFactory("localhost");

        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        return connectionFactory;
    }

    @Bean
    SimpleMessageListenerContainer container(
            ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter,
            RetryOperationsInterceptor interceptor) {

        Advice[] adviceChain = { interceptor };

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setAdviceChain(adviceChain);
        container.setMessageListener(listenerAdapter);

        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        MessageListenerAdapter adapter = 
                new MessageListenerAdapter(receiver, "receiveMessage");

        return adapter;
    }

    @Bean
    RetryOperations retryTemplate() {
         Map<Class<? extends Throwable>, Boolean> retryableExceptions = 
                 new HashMap<Class<? extends Throwable>, Boolean>();
        retryableExceptions.put(EventException.class, false);

        FixedBackOffPolicy backoffPolicy = new FixedBackOffPolicy();
        backoffPolicy.setBackOffPeriod(5000);

        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setBackOffPolicy(backoffPolicy);
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5, retryableExceptions));

        return retryTemplate;
    }

    @Bean
    RetryOperationsInterceptor interceptor(RetryOperations retryTemplate) {
        RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
        interceptor.setRecoverer(new CustomMessageRecover());
        interceptor.setRetryOperations(retryTemplate);

        return interceptor;
//      return RetryInterceptorBuilder
//              .stateless()
//              //.retryOperations(retryTemplate)
//              .maxAttempts(5)
//              .recoverer(new CustomMessageRecover()).build();
    }

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(queueName, "Hello from RabbitMQ!");
        context.close();
    }

    public class Receiver {

        public void receiveMessage(String message) throws Exception {
            System.out.println("!!!!!!!!Message has been recieved!!!!!!");
            throw new EventException("TESTING");
        }
    }

    public class CustomMessageRecover implements MethodInvocationRecoverer<Void> {

        @Override
        public Void recover(Object[] args, Throwable cause) {
            System.out.println("IN THE RECOVER ZONE!!!");
            throw new AmqpRejectAndDontRequeueException(cause);
        }
    }

    class EventException extends Exception {
        private static final long serialVersionUID = 1L;

        public EventException() {}

        public EventException(String message) {
            super(message);
        }
    }
}

Теперь в коде, как вы можете видеть, я использую RetryOperationsInterceptor, чтобы перехватить и проверить, какой тип исключения генерируется, и на основании этого примите решение либо повторить попытку, либо нет, вместе с задержкой между повторными попытками.

Для этого я устанавливаю backoffPolicy и retryPolicy Bean-компонента RetryTemplate и внедряю их в RetryOperationsInterceptor.

Я был бы признателен, если бы кто-нибудь мог мне помочь и рассказать, почему повторная попытка и задержка между повторными попытками не работают. Мои сообщения отправляются прямо в систему обмена недоставленными письмами, без повторных попыток и задержек.

БЛАГОДАРЮ ВАС!


person Ali Moghadam    schedule 11.04.2015    source источник


Ответы (1)


Ваша проблема здесь:

retryableExceptions.put(EventException.class, false);

Найдите код SimpleRetryPolicy:

public boolean canRetry(RetryContext context) {
    Throwable t = context.getLastThrowable();
    return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;
}

и далее:

private boolean retryForException(Throwable ex) {
    return retryableClassifier.classify(ex);
}

Поскольку вы указываете false для своего EventException, это не будет retryable. Отсюда любые попытки и откаты.

person Artem Bilan    schedule 11.04.2015
comment
Привет, Артем, я изменил его на true, но ничего не произошло. По-прежнему та же проблема. - person Ali Moghadam; 11.04.2015
comment
Я мог понять почему. Основываясь на документации. Учитывая, что пользовательские исключения будут заключены в ListenerExecutionFailedException, нам необходимо убедиться, что классификация исследует причины исключения. Классификатор по умолчанию просто смотрит на исключение верхнего уровня. Начиная с Spring Retry 1.0.3, BinaryExceptionClassifier имеет свойство traverseCauses (по умолчанию false). Если установлено значение true, он будет перебирать причины исключения до тех пор, пока не найдет совпадение или пока не будет обнаружена причина. поэтому мне пришлось добавить true к следующему retryTemplate.setRetryPolicy (new SimpleRetryPolicy (5, retryableExceptions, true)); - person Ali Moghadam; 11.04.2015
comment
Спасибо Артему Билану! :-) - person Ali Moghadam; 11.04.2015