Исходящие сообщения отката адаптера Amqp при исключении

Я использую весеннюю интеграцию (с адаптерами входящего / исходящего каналов) для передачи сообщений из

  • JMS в AMQP
  • AMQP в JMS
  • AMQP в AMQP

Это прекрасно работает.

Когда очередь назначения заполнена, я хотел бы остановить процесс и отправить сообщение обратно в исходную очередь (сообщение отката от канала).

Это работает нормально для JMS = ›AMQP и AMQP =› JMS, но KO для AMQP = ›AMQP.

Следующий код работает нормально (JMS = ›AMQP)

<bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
    <property name="autoStartup" value="false" />
    <property name="connectionFactory" ref="connectionFactoryCaching" />
    <property name="destination" ref="jmsQueue" />
    <property name="maxMessagesPerTask" value="1" />
    <property name="receiveTimeout" value="1" />
    <property name="backOff" ref="fixedBackOff" />
    <property name="sessionTransacted" value="true"/>
</bean>
<int-jms:message-driven-channel-adapter id="jmsIn" container="myListener" channel="channelJMS_AMQP" error-channel="processChannel1"/>


<rabbit:template    id="rabbitTemplate" 
    connection-factory="rabbitConnectionFactory"  
    mandatory="true" 
    channel-transacted="true" 
    message-converter="simpleMessageConverter"/>

<int-amqp:outbound-channel-adapter  channel="channelJMS_AMQP" 
    routing-key="RK1" 
    exchange-name="EXC1" 
    amqp-template="rabbitTemplate" 
    default-delivery-mode="PERSISTENT"/>

В лог-файле хорошо написано сообщение отката:

2020-11-19 10:08:57.860 [AMQP Connection XX.XX.XX.XX.130:5672] ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - partial tx completion, class-id=90, method-id=20)
2020-11-19 10:08:57.861 [myListener-1] DEBUG o.s.a.r.c.CachingConnectionFactory - Detected closed channel on exception.  Re-initializing: AMQChannel(amqp://[email protected]:5672/,1)
2020-11-19 10:08:57.950 [myListener-1] DEBUG o.s.a.r.c.RabbitResourceHolder - Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@68935e42 Dedicated Rabbit Connection: SimpleConnection@577ca937 [delegate=amqp://[email protected]:5672/, localPort= 41314]

У меня есть аналогичный код для AMQP = ›JMS, который отлично работает.

Но для AMQP = ›AMQP У меня проблема, сообщение потеряно, и процесс не останавливается:

<bean id="myListener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="rabbitConnectionFactory" />
    <property name="queueNames" value="MY_QUEUE" />
    <property name="recoveryBackOff" ref="fixedBackOffRabbitMQ"/>
    <property name="channelTransacted" value="true"></property>
</bean>
    
<int-amqp:inbound-channel-adapter   channel="channelAMQP_AMQP" 
    id="inboundChannelAdapter" 
    auto-startup="true" listener-container="myListener" error-channel="processChannel1" />  
    
<rabbit:template    id="rabbitTemplate" 
    connection-factory="rabbitConnectionFactory2"  
    mandatory="true" 
    channel-transacted="true" 
    message-converter="simpleMessageConverter"/>

<int-amqp:outbound-channel-adapter  channel="channelAMQP_AMQP" 
    routing-key="RK1" 
    exchange-name="EXC1" 
    amqp-template="rabbitTemplate" 
    default-delivery-mode="PERSISTENT"/>
    

Журнальный файл:

2020-11-19 10:15:24.260 [AMQP Connection XX.XX.XX.XX.130:5672] ERROR o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - partial tx completion, class-id=90, method-id=20)
2020-11-19 10:15:24.351 [myListener-1] DEBUG o.s.a.r.c.CachingConnectionFactory - Detected closed channel on exception.  Re-initializing: AMQChannel(amqp://[email protected]:5672/,1)
2020-11-19 10:15:24.356 [pool-4-thread-3] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received shutdown signal for consumer tag=amq.ctag--o5d3WkrF0N2aiCHul-qiA

А затем перезапуск потребителя вместо остановки и отката сообщения.

Тогда мое сообщение потеряно: - (

Есть ли у вас какие-либо рекомендации, как провести ожидаемую операцию в данном конкретном случае? Должен ли я добавить конкретное лечение?


person Eric NICOLAS    schedule 19.11.2020    source источник


Ответы (1)


У меня были ошибки в идентификаторах компонентов.

Работает во всех случаях :-)

person Eric NICOLAS    schedule 20.11.2020