Для потоковых служб я хочу, чтобы сообщение оставалось в очереди, когда базовая служба, вызванная в @StreamListener
, не работает. С этой целью, насколько я понимаю, единственный способ сделать это - настроить spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL
.
После внесения этого изменения конфигурации я попытался добавить @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
в качестве аргументов метода к моей существующей реализации @StreamListener
, как описано в https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack. При наличии этого кода я обнаружил следующее исключение:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
Затем я обнаружил следующее: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples, в котором показан пример того, как выполнять подтверждение сообщений с помощью Kafka, но в настоящее время я использую привязку RabbitMQ. Мы планируем в конечном итоге перейти на Kafka, но на данный момент, как мне настроить и закодировать решение для ручного подтверждения сообщений для успешно обработанных сообщений и ручного отклонения сообщений, таким образом оставляя сообщение в очереди при возникновении исключений. В настоящее время я использую Spring Cloud Edgware.RELEASE
и Spring Cloud Stream Ditmars.RELEASE
.
ОБНОВЛЕНИЕ
Теперь у меня такая конфигурация:
spring:
cloud:
stream:
bindings:
do-something-async-reply:
group: xyz-service-do-something-async-reply
rabbit:
bindings:
do-something-async-reply:
consumer:
autoBindDlq: true
dlqDeadLetterExchange:
dlqTtl: 10000
requeueRejected: true
И я получаю следующую ошибку при запуске службы:
2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)
Какая конфигурация неправильная / мне не хватает?