Spring AMQP (RabbitMQ) Ошибка отключения канала выброса

Я пытаюсь сделать channel.basicReject() повторно поставить сообщение в очередь на основе некоторого условия, создав MethodInterceptor ConsumerAdvice и добавив его в SMLC factor.setAdviceChain(new ConsumerAdvice()). У меня также есть конфигурация concurrentConsumer, для которой установлено значение 10. В тот момент, когда мое условие отклонения выполнено, я запускаю команду basicReject, и она повторно доставляется и обрабатывается другим потребителем. Во время этого процесса повторной доставки я получаю следующую ошибку:

2019-11-07 17:34:13.268 ERROR 29385 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2019-11-07 17:34:13.268 DEBUG 29385 --- [ool-2-thread-13] o.s.a.r.listener.BlockingQueueConsumer   : Received shutdown signal for consumer tag=amq.ctag-HUaN71TZUqMfLDR7k6LwGQ

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
    at java.lang.Thread.run(Thread.java:748)

Мое сообщение не теряется, но я вижу кучу вышеуказанных ошибок и не могу понять, почему это происходит. Если у кого-то есть какие-то подсказки, пожалуйста, помогите мне.

Ниже представлены журналы трассировки,

2019-11-08 02:11:31.883 TRACE 8695 --- [askExecutor-138] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,99) channel.getChannelNumber()
2019-11-08 02:11:31.883  INFO 8695 --- [askExecutor-138] c.g.s.w.consumer.advice.ArgumentUtils    : Channel number before triggering redelivery : 99 
2019-11-08 02:11:31.883 TRACE 8695 --- [askExecutor-138] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,99) channel.basicReject([2, true])
2019-11-08 02:11:31.883  INFO 8695 --- [askExecutor-138] c.g.s.w.consumer.advice.ArgumentUtils    : ==============================================================================
2019-11-08 02:11:31.883  INFO 8695 --- [askExecutor-138] c.g.s.w.consumer.advice.ConsumerAdvice   : Requeue Message attempted, status : true
2019-11-08 02:11:31.884 TRACE 8695 --- [askExecutor-138] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for message from consumer.
2019-11-08 02:11:31.884 TRACE 8695 --- [askExecutor-138] o.s.a.r.listener.BlockingQueueConsumer   : Retrieving delivery for Consumer@7783912f: tags=[[amq.ctag-eY7LN-1pSXPX8FKRBgt-ug]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,99), conn: Proxy@37ffe4f3 Shared Rabbit Connection: SimpleConnection@708dfe10 [delegate=amqp://[email protected]:5672/, localPort= 58638], acknowledgeMode=AUTO local queue size=0
2019-11-08 02:11:31.884 DEBUG 8695 --- [askExecutor-138] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.checkShutdown(BlockingQueueConsumer.java:436)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:501)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:843)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:832)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:78)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1073)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
    ... 1 common frames omitted

2019-11-08 02:11:31.884 ERROR 8695 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=90)

person Sunand Padmanabhan    schedule 08.11.2019    source источник


Ответы (2)


Вам нужно показать свой код и конфигурацию.

Похоже, что SMLC имеет конфигурацию по умолчанию для автоматического подтверждения сообщений, и этот сбой вызван тем, что вы его уже отклонили; почему вы напрямую взаимодействуете с каналом?

Вы можете просто вызвать исключение, и контейнер отклонит сообщение от вашего имени.

person Gary Russell    schedule 08.11.2019
comment
Я поделюсь проектом на github, содержащим эту ошибку, через пару дней. - person Sunand Padmanabhan; 09.11.2019

Не знаю, поможет ли это кому-то.

У меня была такая же ошибка из-за неверных входных данных. Но когда я добавил следующие свойства:

"rabbit.listener.acknowledgeMode": "MANUAL",
"rabbit.listener.defaultRequeueRejected": "true",
"rabbit.listener.prefetchCount": "1",

проблема перестала нарушать мою программу, но остановил только мой слушатель

person Zerg    schedule 29.12.2020