spring amqp включает повторную попытку по умолчанию и предотвращает ее в соответствии с указанным исключением

В случае исключения A: повторные попытки конечное число раз и, наконец, когда число повторных попыток исчерпано, сообщение записывается в очередь недоставленных сообщений

В случае исключения B: просто сообщение должно быть записано в очередь недоставленных сообщений

Я пытаюсь реализовать тот же вариант использования, что и в, и я выполнил все шаги в соответствии с правильным ответом, но я вижу, что мое настраиваемое исключение переносится в ListenerExecutionFailedException. Я не могу заставить его прекратить повторные попытки для настраиваемого исключения.

Я выполнил шаги в ответах ниже spring amqp разрешает повторную попытку с помощью конфигурации и предотвращает ее в соответствии с указанным исключением

а также

Кролик Spring пытается доставить отклоненное сообщение .. все в порядке ?


@Bean
    public SimpleRetryPolicy rejectionRetryPolicy(){
        Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();        
        exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
        exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3 , exceptionsMap ,true);
        return retryPolicy;
    }

    @Bean
    public RetryOperationsInterceptor workMessagesRetryInterceptor() {
        return RetryInterceptorBuilder.stateless().retryPolicy(rejectionRetryPolicy())

                //.backOffOptions(1000, 2, 10000)
                .recoverer(new RepublishMessageRecoverer(defaultTemplate, this.getDlqExchange(), this.getDlqroutingkey()))
                .build();
    }


/* My Rabbit MQ Error handler */

@Component
public class RabbitRetryHandler implements RabbitListenerErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(RabbitRetryHandler.class);

    @Override
    public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
            ListenerExecutionFailedException exception) throws Exception {      
        if (amqpMessage.getMessageProperties().isRedelivered() || exception.getCause().getMessage().equals("DontRetry")) {          
            throw new AmqpRejectAndDontRequeueException(exception.getCause());
        } else {
            throw exception;
        }
    }
}

/* And finally my Listener */

@Override
    @RabbitListener(queues = "${queueconfig.queuename}",containerFactory = "sdRabbitListenerContainerFactory",errorHandler="rabbitRetryHandler")
    public void processMessage(Message incomingMsg) throws Exception {
        log.info("{} - Correlation ID: {} Received message: {} from {} queue.", Thread.currentThread().getId(),
                incomingMsg.getMessageProperties().getCorrelationId(), new String(incomingMsg.getBody()),
                incomingMsg.getMessageProperties().getConsumerQueue());
        try {
            performAction();
        } catch(CustomDontRequeueException cex) {
            throw cex;
        } catch (Exception ex) {
            throw ex;
        }
    }


@Override
    public void performAction() throws Exception {
        try {

        } catch (HttpClientErrorException ex) {
            if (ex.getStatusCode() == HttpStatus.NOT_FOUND || ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT) {

                throw new RuntimeException(ex);
            } else {
                throw new CustomDontRequeueException("DontRetry",ex);
            }
        }catch (Exception e) {          
            throw new CustomDontRequeueException(e);
        } 

    }

Ожидаемый результат, если выбрасывается исключение CustomDontRequeueException, сообщение не должно быть повторно поставлено в очередь.

Фактический результат, сообщение повторно ставится в очередь независимо от того, какое исключение было n количество раз, а затем сбрасывается в DLQ.


person Naveen Kumar Pandian    schedule 19.04.2019    source источник


Ответы (2)


    exceptionsMap.put(AmqpRejectAndDontRequeueException.class, false);//not retriable
    exceptionsMap.put(ListenerExecutionFailedException.class, true); //retriable

Вы не настроили CustomDontRequeueException, чтобы не повторять попытки, вы настроили AmqpRejectAndDontRequeueException.

Кроме того, вы не должны явно указывать ListenerExecutionFailedException, потому что он будет найден первым, что предотвратит обход причины.

person Gary Russell    schedule 19.04.2019
comment
Конечно, это помогло, но когда я удаляю ListenerExecutionFailedException, повторных попыток не происходило, когда это предполагалось ... поэтому я придумал другое настраиваемое исключение, exceptionsMap.put(CustomDontRequeueException.class, false);//not retriable exceptionsMap.put(CustomRequeueException.class, true);//retriable Также я удалил обработчик ошибок из слушателя, поскольку он больше не нужен ... - person Naveen Kumar Pandian; 19.04.2019

Мой измененный код:

/* Kept 2 different custom exceptions one for retry and one for not retry*/
    @Bean
    public SimpleRetryPolicy rejectionRetryPolicy(){
        Map<Class<? extends Throwable> , Boolean> exceptionsMap = new HashMap<Class<? extends Throwable> , Boolean>();        
        exceptionsMap.put(CustomDontRequeueException.class, false);//not retriable
        exceptionsMap.put(CustomRequeueException.class, true);//retriable
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3 , exceptionsMap ,true);
        return retryPolicy;
    }

    @Bean
    public RetryOperationsInterceptor workMessagesRetryInterceptor() {
        return RetryInterceptorBuilder.stateless().retryPolicy(rejectionRetryPolicy())
                .backOffPolicy(exponentialBackOffPolicy)
                .recoverer(new RepublishMessageRecoverer(defaultTemplate, this.getDlqExchange(), this.getDlqroutingkey()))
                .build();
    }

/* Listener --- removed error handler*/
@Override
    @RabbitListener(queues = "${queueconfig.signdocuments.queuename}",containerFactory = "asdrabbitListenerContainerFactory")
    public void processMessage(Message incomingMsg) throws Exception {      
        try {
            performAction();
        } catch(CustomDontRequeueException cex) {
            throw cex;
        } catch (Exception ex) {
            throw ex;
        }
    }

/* Action which throws custom exceptions depending on what exceptions they get*/

@Override
    public void performAction() throws Exception {
        try {

        } catch (HttpClientErrorException ex) {
            if (ex.getStatusCode() == HttpStatus.NOT_FOUND || ex.getStatusCode() == HttpStatus.REQUEST_TIMEOUT) {               
                throw new CustomRequeueException("Retry",ex);
            } else {                
                throw new CustomDontRequeueException("DontRetry",ex);
            }
        }catch (Exception e) {
            throw new CustomDontRequeueException("DontRetry",e);
        }       
    }
person Naveen Kumar Pandian    schedule 19.04.2019