Я хочу обработать сообщение AMQP, а затем отправить его в другую очередь для дальнейшей обработки.
Я использую Spring Integration DSL для его архивации, как показано ниже,
@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
OCRService ocrService) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
.concurrentConsumers(2))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing ocr here
amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
})
.get();
}
@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
.concurrentConsumers(4))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing some work for type processing
}).get();
}
Однако я обнаружил, что сообщение в очереди NOTE_INCOMING_QUEUE все еще не запаковано, когда новое сообщение обрабатывается в фазе процесса типа. См. Ниже снимок экрана управления rabbitmq.
Мне интересно, почему сообщение в NOTE_INCOMING_QUEUE все еще не запаковано, хотя обработчик уже был успешно выполнен. Это дизайн весенней интеграции amqp или что-то не так в моем коде?