Как правильно обработать сообщение amqp, а затем отправить его в другую очередь в Spring Integratin DSL

Я хочу обработать сообщение AMQP, а затем отправить его в другую очередь для дальнейшей обработки.

Я использую Spring Integration DSL для его архивации, как показано ниже,

@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
                           OCRService ocrService) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
            .concurrentConsumers(2))
            .transform(new JsonToObjectTransformer(Note.class))
            .handle(msg -> {
                // doing ocr here
                amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
            })
            .get();
}

@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
            .concurrentConsumers(4))
            .transform(new JsonToObjectTransformer(Note.class))
            .handle(msg -> {
                // doing some work for type processing
            }).get();
}

Однако я обнаружил, что сообщение в очереди NOTE_INCOMING_QUEUE все еще не запаковано, когда новое сообщение обрабатывается в фазе процесса типа. См. Ниже снимок экрана управления rabbitmq.

введите здесь описание изображения

Мне интересно, почему сообщение в NOTE_INCOMING_QUEUE все еще не запаковано, хотя обработчик уже был успешно выполнен. Это дизайн весенней интеграции amqp или что-то не так в моем коде?


person Kane    schedule 15.05.2017    source источник


Ответы (1)


Используйте Amqp.inboundAdapter() вместо шлюза - шлюз ожидает ответа, который никогда не поступит.

Шлюзы предназначены для сценариев запроса / ответа; адаптеры каналов предназначены для односторонних сценариев.

person Gary Russell    schedule 15.05.2017
comment
Гэри, ты имеешь в виду Amqp.inboundAdapter? Я попытался использовать Amqp.inboundAdapter вместо шлюза, но поведение по-прежнему неожиданное. Я установил точку останова в обработчике typeProcess, сообщение в очереди входящих все еще не было запаковано, пока сообщение не было должным образом обработано обработчиком typeProcess. - person Kane; 16.05.2017
comment
Да, извините, я имел в виду inboundAdapter(). Я не понимаю, как может произойти то, что вы описываете, контейнер во входящем адаптере автоматически подтвердит сообщение (по умолчанию), когда поток вернется в контейнер после выполнения потока. Конечно, если у вас есть точка останова, она не будет подтверждена, пока вы не отпустите поток. - person Gary Russell; 16.05.2017
comment
В моем фрагменте кода у меня есть два входящих адаптера и два обработчика. Будут ли разные обработчики использовать разные потоки для выполнения потока? Как вы сказали, сообщение будет подтверждено после возврата потока. В моем случае сообщение сначала поступает в очередь note.incoming, поток выполнен успешно, затем я отправил другое сообщение в очередь note.ocred. Я установил точку останова в обработчике очереди note.ocred, чтобы приостановить процесс, почему сообщение в очереди note.incoming все еще не распаковано? - person Kane; 16.05.2017
comment
Да, они будут выполняться в отдельных потоках. Я не могу объяснить, почему у вас все еще есть неподтвержденное сообщение во входной очереди; когда поток возвращается в контейнер, он будет подтвержден. Предлагаю вам включить ведение журнала DEBUG; если до сих пор не можете разобраться; опубликуйте пример, показывающий поведение. - person Gary Russell; 16.05.2017
comment
Гэри, это моя вина. Точка останова второй очереди остановила всю JVM, что определенно остановило подтверждение входящего сообщения. Еще один вопрос: есть ли еще элементарный способ (по сравнению с прямым вызовом amqpTemplate.convertAndSend()) для отправки сообщения в другую очередь после обработки сообщения из входящей очереди? - person Kane; 17.05.2017