Необходимо проверить подход: Spring Integration + AMQP + Async

Я недавно исследовал Spring Integration и AMQP (RabbitMQ), так как мне нужно связать два приложения (промежуточное ПО и бэкэнд) с помощью асинхронного подхода, чтобы промежуточное ПО не блокировалось при получении клиентских вызовов.

Сначала я применил более простой подход - реализовать это в синхронном режиме, это означает, что у меня есть интерфейс шлюза и шлюз исходящей почты (с requireReply = true) на промежуточном программном обеспечении, а затем шлюз входящей почты и активатор службы на бэкэнде. Этот первоначальный подход работает хорошо (я использовал XML-конфигурацию Spring Integration).

Теперь мне нужно пояснить, как сделать эту работу асинхронной.

Посмотрев на RabbitMQ Tutorial 6, лучше работать с очередью обратного вызова и correlationId, и, насколько я понял, это будет похоже на вызов Spring RabbitTemplate convertAndSend () и затем receive () вместо convertSendAndReceive () (который будет заблокирован до получения ответа).

Я проверил документы Spring Integration, где мне нужно заменить интерфейс шлюза на промежуточном программном обеспечении, чтобы он возвращал Future или ListenableFuture.

Асинхронный Шлюз

После этого я также просмотрел документацию для исходящий шлюз, где говорится, что он может работать вместе с RabbitTemplate для управления атрибутами сообщения correlationID и replyTo.

Мои вопросы:

  1. Чтобы эта работа работала с использованием асинхронного подхода, следует ли мне продолжать работать со шлюзами исходящей / входящей почты, а не с конвертерами исходящих / входящих сообщений?
  2. В случае использования подхода конвертеров исходящих / входящих сообщений (который мне кажется похожим на то, что показано в учебнике RabbitMQ), как мне связать Future на интерфейсе шлюза с результатом, возвращаемым с помощью адаптера входящего канала?

person Franco Guidoli    schedule 29.03.2015    source источник
comment
Я забыл упомянуть, что я уже проверял это еще одна запись, которая указывает мне на документы RabbitTemplate.   -  person Franco Guidoli    schedule 29.03.2015


Ответы (1)


Честно говоря, вы не предоставляете исходных бизнес-требований. Это может быть фактом, что на самом деле нет причин иметь дело с этим async хендофф, потому что у вас есть @Gateway в качестве точки входа, которая не имеет потоков, и даже если она заблокирована для ожидания ответа, это не влияет на другие потоки, которые могут выполнять аналогичную sendAndReceive операцию. В большинстве случаев действительно достаточно делать все в одном потоке запросчика и не терять производительность при переходе на общий ThreadPoolExecutor.

Правильно, Future позволяет вам немного освободить вызывающего, чтобы он был готов принимать новые запросы в том же потоке.

Поскольку это MessagingGateway, и вы все равно хотите получить ответ, с запросом связан перехватчик - заголовок TemporaryReplyChannel. Вот почему этот <outbound-gateway> работает правильно: он помещает свой блокирующий ответ на этот канал для return шлюза (или для FutureTask#set()).

Я бы сказал, что мы можем добиться того же TemporaryReplyChannel выигрыша с вашим требованием асинхронного ответа.

  1. Вы должны использовать пару _12 _ / _ 13_ канальных адаптеров.
  2. Перед отправкой сообщения <int-amqp:outbound-channel-adapter> вы должны сделать это <header-channels-to-string> для <header-enricher>.
  3. На стороне сервера может быть то же самое - <int-amqp:inbound-gateway>
  4. Вы должны использовать фиксированный replyQueue в качестве заголовка сообщения, отправляемого через <int-amqp:outbound-channel-adapter>
  5. <int-amqp:inbound-channel-adapter> должен быть настроен для этого фиксированного replyQueue.
  6. И <int-amqp:outbound-channel-adapter> на стороне клиента, и <int-amqp:inbound-gateway> должны быть настроены для mapped-request-headers="*", чтобы разрешить распространение этого заголовка reply-channel на сервер и наоборот.
  7. <int-amqp:inbound-channel-adapter> на стороне клиента просто отправит ответ reply-channel, как и для <int-amqp:outbound-gateway>
  8. Возможно, вам придется позаботиться о correlationId вручную, поскольку <int-amqp:inbound-gateway> может потребоваться это для правильного ответа.

Ну типа того...

HTH

Не стесняйтесь задавать больше вопросов. Или поправьте меня, если я неправильно понял ваш вопрос.

person Artem Bilan    schedule 30.03.2015
comment
Спасибо Артему за ответ. Я согласен с тем, что, вероятно, требованиям не нужен асинхронный подход (сейчас я нахожусь на проверке концепции, и я хотел бы знать, могу ли я использовать асинхронный подход, если это необходимо). В общем, мой вопрос заключается в том, должен ли я сохранить исходящий шлюз на производителе (который, как мне кажется, блокируется до получения ответа), или я должен заменить его отдельным адаптером исходящего канала + адаптером входящего канала? Спасибо еще раз. - person Franco Guidoli; 30.03.2015
comment
Извините, вы еще чего-нибудь от меня ждете? - person Artem Bilan; 01.04.2015
comment
Нет, спасибо Артем. Я просто использую исходящий шлюз вместо его замены адаптерами исходящего / входящего каналов. Спасибо, Франко. - person Franco Guidoli; 02.04.2015
comment
Что ж, хороший тон в таком случае, чтобы принять ответ, чтобы закрыть тему - person Artem Bilan; 02.04.2015