Spring AMQP StatefulRetryOperationsInterceptor не используется

Я пытаюсь настроить spring amqp, чтобы повторять сообщение только определенное количество раз. В настоящее время сообщение, которое не работает, например. из-за DataIntegrityViolationException повторно доставляется на неопределенный срок.

Согласно документации здесь я придумал следующую конфигурацию

@Bean
    public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
        return RetryInterceptorBuilder.stateful()
                .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
                .maxAttempts(3)
                .messageKeyGenerator(message -> UUID.randomUUID().toString())
                .build();
    } 

Похоже, это не применяется - сообщения все еще проверяются на неопределенный срок.

Такое ощущение, что я что-то здесь упускаю.

Вот моя оставшаяся конфигурация относительно AMQP:

@Bean
    Queue testEventSubscriberQueue() {
        final boolean durable = true;
        return new Queue("testEventSubscriberQueue", durable);
    }

    @Bean
    Binding binding(TopicExchange topicExchange) {
        return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
    }

    @Bean
    SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(testEventSubscriberQueue().getName());
        container.setMessageListener(listenerAdapter);
        container.setChannelTransacted(true);
        return container;
    }


    @Bean
    MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
        listenerAdapter.setMessageConverter(messageConverter);
        return listenerAdapter;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper objectMapper) {
        final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        jsonMessageConverter.setJsonObjectMapper(objectMapper);
        DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
        defaultClassMapper.setDefaultType(EventPayload.class);
        jsonMessageConverter.setClassMapper(defaultClassMapper);
        final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
        messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
        return messageConverter;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        //rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

@Bean
    public TopicExchange testExchange() {
        final boolean durable = true;
        final boolean autoDelete = false;
        return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
    }

Я использую spring-amqp 1.5.1.RELEASE.

Любая помощь приветствуется.


person Mathias Dpunkt    schedule 09.11.2015    source источник


Ответы (1)


Вам нужно настроить контейнер, чтобы добавить перехватчик в свою цепочку рекомендаций...

container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });
person Gary Russell    schedule 09.11.2015
comment
Обратите внимание, что использование случайного UUID в качестве ключа сообщения не очень полезно — мы не можем отслеживать количество повторных попыток; ключ сообщения должен быть чем-то уникальным в сообщении. Если исходной системой является Spring AMQP, ее можно настроить для установки идентификатора сообщения в заголовках сообщений. Если у вас нет уникального идентификатора, рассмотрите возможность повторной попытки без сохранения состояния, которая не включает повторную доставку. - person Gary Russell; 09.11.2015