Как вручную подтвердить сообщения RabbitMQ в Spring Cloud Stream?

Для потоковых служб я хочу, чтобы сообщение оставалось в очереди, когда базовая служба, вызванная в @StreamListener, не работает. С этой целью, насколько я понимаю, единственный способ сделать это - настроить spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL.

После внесения этого изменения конфигурации я попытался добавить @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag в качестве аргументов метода к моей существующей реализации @StreamListener, как описано в https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack. При наличии этого кода я обнаружил следующее исключение:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:941)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:851)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'amqp_channel' for method parameter type [interface com.rabbitmq.client.Channel]
    at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100)
    at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)

Затем я обнаружил следующее: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples, в котором показан пример того, как выполнять подтверждение сообщений с помощью Kafka, но в настоящее время я использую привязку RabbitMQ. Мы планируем в конечном итоге перейти на Kafka, но на данный момент, как мне настроить и закодировать решение для ручного подтверждения сообщений для успешно обработанных сообщений и ручного отклонения сообщений, таким образом оставляя сообщение в очереди при возникновении исключений. В настоящее время я использую Spring Cloud Edgware.RELEASE и Spring Cloud Stream Ditmars.RELEASE.

ОБНОВЛЕНИЕ

Теперь у меня такая конфигурация:

spring:
  cloud:
    stream:
      bindings:
        do-something-async-reply:
          group: xyz-service-do-something-async-reply
      rabbit:
        bindings:
          do-something-async-reply:
            consumer:
              autoBindDlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000
              requeueRejected: true

И я получаю следующую ошибку при запуске службы:

2018-01-12 14:46:34.346 ERROR [xyz-service,,,] 2488 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'do-something-async-reply.xyz-service-do-something-async-reply' in vhost '/': received the value 'DLX' of type 'longstr' but current is none, class-id=50, method-id=10)

Какая конфигурация неправильная / мне не хватает?


person Keith Bennett    schedule 10.01.2018    source источник


Ответы (1)


Имя свойства неверно; вам не хватает .rabbit. Это

spring.cloud.stream. кролик .bindings.<channel>.consumer.acknowledge-mode = РУЧНОЙ

поскольку это свойство зависит от кроликов - см. документация.

ИЗМЕНИТЬ

Пример:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So481977082Application {

    public static void main(String[] args) {
        SpringApplication.run(So481977082Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(AmqpHeaders.CHANNEL) Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println(in);
        Thread.sleep(60_000);
        channel.basicAck(tag, false);
        System.out.println("Ackd");
    }

}

Имейте в виду, что необходимость в ручных подтверждениях часто возникает из-за запаха; как правило, лучше позволить контейнеру обрабатывать подтверждения; см. requeueRejected по той же ссылке на документ. Безусловная повторная постановка в очередь может вызвать бесконечный цикл.

РЕДАКТИРОВАТЬ2

У меня отлично работает ...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So48197708Application {

    public static void main(String[] args) {
        SpringApplication.run(So48197708Application.class, args);
    }

    @Bean
    ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(new GenericMessage<>("foo"));
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Header(name = "x-death", required = false) List<?> death) {
        System.out.println(death);
        throw new RuntimeException("x");
    }

}

с участием

spring:
  cloud:
    stream:
      bindings:
        input:
          group: foo
          content-type: application/json
          destination: foo
          consumer:
            max-attempts: 1
        output:
          content-type: application/json
          destination: foo
      rabbit:
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
              dlqDeadLetterExchange:
              dlqTtl: 10000

Результат:

null
...
Caused by: java.lang.RuntimeException: x
...
[{reason=expired, count=1, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
    {reason=rejected, count=1, exchange=foo, time=Fri Jan 12 17:20:18 EST 2018, routing-keys=[foo], queue=foo.foo}]
...

...
[{reason=expired, count=3, exchange=DLX, routing-keys=[foo.foo], time=Fri Jan 12 17:20:28 EST 2018, queue=foo.foo.dlq}, 
    {reason=rejected, count=3, exchange=foo, routing-keys=[foo], time=Fri Jan 12 17:20:18 EST 2018, queue=foo.foo}]
person Gary Russell    schedule 11.01.2018
comment
Гэри, если для параметра RequeueRejected установлено значение true, будет ли сообщение автоматически повторно поставлено в очередь при возникновении исключения из @StreamListener? Это все, что нужно сделать? - person Keith Bennett; 11.01.2018
comment
Хорошо, я нашел информацию об этом на docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/. Когда в документации указано «Установите dlqDeadLetterExchange на обмен по умолчанию», каково значение для обмена по умолчанию? - person Keith Bennett; 11.01.2018
comment
Любое исключение, кроме AmqpRejectAndDontRequeueException, приведет к повторной постановке сообщения в очередь, если повторная попытка отключена. Если повторная попытка включена, связыватель вызовет это исключение, когда количество повторных попыток исчерпано. Также есть republishToDlq место, где связыватель опубликует ошибку, включая заголовки, содержащие информацию об ошибке. Обмен по умолчанию - пустая строка. Все очереди привязаны к этому обмену с их именем очереди в качестве ключа маршрутизации. - person Gary Russell; 11.01.2018
comment
Хорошо, я считаю, что моя конфигурация правильно определена в соответствии с docs.spring.io/spring-cloud-stream/docs/Ditmars.SR2/reference/, но я получаю сообщение об ошибке, задокументированное в ОБНОВЛЕНИИ моего исходного сообщения. Я пробовал внести некоторые изменения / дополнения, но не могу понять, что я указал неправильно. - person Keith Bennett; 12.01.2018
comment
Как показано в разделе Все вместе вам просто нужно использовать пустое значение для dlqDeadLetterExchange not ''. - person Gary Russell; 13.01.2018
comment
Я удалил '', как это отражено в ОБНОВЛЕНИИ, и все еще получаю ту же ошибку. Я просто добавил конфигурацию spring.cloud.stream.bindings... для справки и в свое ОБНОВЛЕНИЕ. Ошибка указывает, что проблема связана с group каналом (xyz-service-do-something-async-reply), на который я хочу отправить сообщение. Конфигурация этого канала находится в xyz-service.yml. - person Keith Bennett; 13.01.2018
comment
См. Мое второе изменение для рабочего примера. Поскольку ваша очередь уже существует, вы должны удалить ее; вы не можете изменить аргументы в существующей очереди. - person Gary Russell; 13.01.2018
comment
Спасибо за пример. Я посмотрю на это. Похоже, у меня есть какая-то проблема с курицей / яйцом. После удаления очереди ошибка больше не отображается при запуске службы потребителя. Однако, когда я запускаю службу производителя, отображается та же ошибка. На это же имя очереди в поставщике ссылаются следующим образом: spring.cloud.stream.bindings.do-something-async-reply.producer.required-groups: xyz-service-do-something-async-reply. Что интересно, так это со стороны потребителей, я видел received the value 'DLX' of type 'longstr' but current is none, а теперь наоборот. - person Keith Bennett; 13.01.2018
comment
Сейчас вижу со стороны производителя received none but current is the value 'DLX' of type 'longstr'. - person Keith Bennett; 13.01.2018
comment
Гэри, я только что понял, что мне нужны следующие конфигурации на стороне производителя: spring.cloud.stream.rabbit.bindings.do-something-async-reply.producer.auto-bind-dql:true, spring.cloud.stream.rabbit.bindings.do-something-async-reply.producer.dlq-dead-letter-exchange: и spring.cloud.stream.rabbit.bindings.do-something-async-reply.producer.dlq.ttl: 10000. На данный момент никаких ошибок ни со стороны потребителя, ни со стороны производителя. Спасибо за вашу помощь! - person Keith Bennett; 13.01.2018
comment
Да, если у вас есть required-groups на стороне производителя (что вызывает объявление очереди потребителя), конфигурация привязки очереди должна соответствовать конфигурации потребителя. Обычно лучше отделить потребителя от производителя, но required-groups необходимо, если вы можете развернуть нового производителя перед новым потребителем. - person Gary Russell; 13.01.2018