Повторно ставьте сообщения AMQP кролика в хвост очереди после максимального количества попыток

В настоящее время я использую RabbitMQ с пружиной (spring-rabbit-1.2.0-RELEASE) со следующей конфигурацией:

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- Asynchronous exchanges -->
<!-- Admin -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- Error Handler -->
<bean id="biErrorHandler" class="my.project.sync.BiErrorHandler" />
<!-- Message converter -->
<bean id="biMessageConverter" class="my.project.sync.BiMessageConverter"/>

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
    <property name="messageKeyGenerator" ref="biKeyGenerator" />
</bean> 

<bean id="biKeyGenerator" class="my.project.sync.BiMessageKeyGenerator"/>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="3000" />
            <property name="maxInterval" value="30000" />
        </bean>     
    </property>
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="3" />
        </bean>     
    </property>
</bean>  

 <rabbit:queue id="biSynchronizationQueue" name="BI_SYNCHRONIZATION_QUEUE" durable="true" />
<rabbit:listener-container message-converter="biMessageConverter" concurrency="1" 
    connection-factory="connectionFactory"
    error-handler="biErrorHandler"
    advice-chain="retryInterceptor"
    acknowledge="auto">
    <rabbit:listener queues="BI_SYNCHRONIZATION_QUEUE" ref="biSynchronizationService" method="handleMessage"/>
</rabbit:listener-container>

Я хочу повторно поставить сообщение в хвосте очереди после третьей попытки. Но я не могу этого сделать.

Есть у кого-то идея?

Заранее спасибо за помощь.


person Denis Cucchietti    schedule 04.02.2014    source источник


Ответы (2)


Вместо использования RejectAndDontRequeueRecoverer используйте пользовательский MessageRecoverer, который использует RabbitTemplate для отправки сообщения в конец очереди; затем бросьте AmqpRejectAndDontRequeueException, чтобы сообщение было отклонено.

Вы можете создать подкласс RejectAndDontRequeueRecoverer, отправить сообщение в recover(), а затем вызвать super.recover() (который просто вызывает исключение). Или просто сделайте всю реализацию в своей recover().

person Gary Russell    schedule 04.02.2014
comment
Спасибо, Гэри, завтра попробую и доложу! - person Denis Cucchietti; 04.02.2014
comment
К вашему сведению, мы добавили RepublishMessageRecoverer в предстоящую версию 1.3; см. последний абзац здесь: docs.spring.io/spring-amqp/docs/1.3.0.BUILD-SNAPSHOT/reference/ - person Gary Russell; 27.03.2014
comment
@GaryRussell В чем вы видите основные преимущества RepublishMessageRecoverer по сравнению с обменом недоставленными письмами? - person Ryan Walls; 27.03.2014
comment
На самом деле особой разницы нет, просто альтернатива. Однако есть несколько вещей: 1. Переизданное сообщение получает заголовки x-exception-stacktrace и x-exception-message. 2. Вы можете выполнять маршрутизацию к разным обменам / ключам маршрутизации в зависимости от содержимого сообщения (хотя для этого потребуется немного настраиваемого кодирования - подкласс RabbitTemplate и переопределение send(exchange, key, message)). 3. Вы можете дополнительно изменить сообщение (используя ту же технику). 4. .... С DLE сообщение пересылается с некоторой информацией о кроликах в заголовках, но без информации уровня приложения, связанной с ошибкой. - person Gary Russell; 27.03.2014
comment
Отличный пример ... но как я могу настроить это с помощью аннотаций и в последней версии, где предполагается, что восстановителем будет MethodInvocationRecoverer<?>? Это старое определение, написанное год назад @GaryRussell, уже недействительно :( .recoverer(new RejectAndDontRequeueRecoverer()) - person Luca Stucchi; 09.04.2015
comment
Я предлагаю вам задать для этого новый вопрос - включите свой код. - person Gary Russell; 09.04.2015

Я воспользовался этим советом, но затем пошел немного в другом направлении, применив очередь недоставленных сообщений. Для меня это было более явным, чем помещение сообщения в конец очереди.

 <rabbit:queue name="content.variantchange.queue" durable="true" queue-arguments="queueArguments"/>

<util:map id="queueArguments">
    <entry key="x-dead-letter-exchange" value="content.deadletter.topic"/>
</util:map>

  <rabbit:fanout-exchange name="content.deadletter.topic">
    <rabbit:bindings>
        <rabbit:binding queue="content.deadletter.queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:queue name="content.deadletter.queue" durable="true"/>

Создав очередь недоставленных сообщений, вы можете делать все, что захотите, через другого потребителя (включая добавление его обратно в исходную очередь).

Я также в конечном итоге использовал версию перехватчика без сохранения состояния, рекомендованную Гэри, что позволило мне не беспокоиться о генераторе идентификаторов сообщений.

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
</bean>
person Ryan Walls    schedule 27.03.2014