Отправка сообщения в определенную очередь RabbitMQ с использованием SpringIntegration Java DSL

Я работаю над приложением Java, которое использует Spring Boot версии 2.0.4.RELEASE и RabbitMQ версии 3.7.7. Приложение кэширует все сообщения из RabbitMQ в базе данных Redis и должно повторно отправлять их, когда в RabbitMQ создается новая очередь. В настоящее время мне удалось зафиксировать создание очереди, используя плагин обмена событиями, а также имя очереди. Я использую исходящий адаптер AMQP для отправки сообщений обратно в RabbitMQ.

OutFlow

public IntegrationFlow outFlow(AmqpTemplate amqpTemplate) {
    return IntegrationFlows.from(outputChannel())
            .handle(Amqp.outboundAdapter(amqpTemplate)
                    .routingKeyExpression("headers.routingKey")
                    .exchangeNameExpression("headers.exchange"))
            .get();
}

Я могу отправлять сообщения на конкретный обмен с помощью routingKey. Но я не знаю, как настроить имя очереди в исходящем адаптере. Чтобы я мог отправить сообщение в эту конкретную очередь.


person Vimal David    schedule 20.08.2018    source источник
comment
IIRC, сопоставление routingKeys с очередями - это то, что вы настраиваете на сервере RabbitMQ, а не в отправителе ... Таким образом, отправителю нужно только знать, какой правильный routingKey им следует использовать, а фактическая очередь скрыта за обменом .. .   -  person moilejter    schedule 20.08.2018
comment
да, отправителю нужно знать только «routingKey» и «exchangeName». В моем сценарии мне нужно отправить в определенную очередь, а не во все очереди, связанные с этим конкретным обменом. Если возможно, я с удовольствием отправлю сообщения прямо в Очередь, минуя обмен.   -  person Vimal David    schedule 20.08.2018
comment
Но вы можете настроить обмен RabbitMQ так, чтобы он знал, как направить конкретный routingKey в определенную очередь - тогда ваш клиент, выбрав правильный routingKey, увидит, что его сообщения попадают в правильную очередь ...   -  person moilejter    schedule 20.08.2018
comment
Да, мы могли бы привязать обмен к очереди с помощью routingKey. К сожалению, моя спецификация ранее просила не создавать очереди. Таким образом, RabbitMQ создает новую очередь, когда новый клиент подключается и подписывается на сообщение. Таким образом, к одному обмену может быть привязано несколько очередей. Но когда создается новая очередь, я хочу отправлять сообщения в эту конкретную очередь, а не все очереди, привязанные к этому обмену.   -  person Vimal David    schedule 21.08.2018
comment
Это что-то похожее на этот вопрос stackoverflow.com/questions/43408096/ Вместо RabbitTemplate я использую исходящий адаптер Spring DSL AMQP   -  person Vimal David    schedule 21.08.2018
comment
Разве вы не хотите просто создать ключ маршрутизации для каждой очереди, когда вы их регистрируете, а затем как-то делиться этими ключами со всеми клиентами? Таким образом, они будут использовать правильный ключ маршрутизации для правильной очереди, и обмен доставит его в соответствующую очередь?   -  person moilejter    schedule 21.08.2018
comment
да, мы уже делаем это для отправки сообщений. Сценарий, который у нас есть сейчас, заключается в отправке кэшированных сообщений. Производитель отправит сообщения в RabbitMQ, и они будут кэшированы. Он будет кэшировать все сообщения. С другой стороны, у нас есть клиент MQTT. когда клиент устанавливает новое соединение, он создает новую очередь. Таким образом, мы должны фильтровать кешированные сообщения на основе routingKey и отправлять их в очередь, минуя обмен.   -  person Vimal David    schedule 21.08.2018
comment
@VimalDavid, взгляните на этот ответ: stackoverflow.com/questions/18531308/   -  person Rod    schedule 12.09.2018
comment
Спасибо, @Rod. Прочитав ответ, я понял, что производитель никогда не отправляет сообщения напрямую в очередь. Но у нас было типичное требование отправлять сообщения в определенную очередь. У службы кэша есть несколько потребителей с каждой отдельной Очередью, и сообщения кэша должны быть отправлены конкретному потребителю. Следующий вопрос StackOverflow также требует того же ответа stackoverflow.com/questions/17766928/   -  person Vimal David    schedule 12.09.2018
comment
@VimalDavid, но согласно ответу, который я связал, есть обходной путь, который позволяет делать то, что вы хотите. Я еще не тестировал его ... в нем говорится: каждая очередь автоматически привязывается к обмену по умолчанию AMQP с именем очереди в качестве ключа маршрутизации. Обмен по умолчанию также известен как безымянный обмен, то есть его имя - пустая строка. Таким образом, если вы публикуете на бирже с ключом маршрутизации, равным имени вашей очереди, сообщение попадет именно в эту очередь.   -  person Rod    schedule 12.09.2018
comment
@Rob Да, мы попробовали эту реализацию. Отправка сообщений с использованием обмена по умолчанию заменит значение «routing_key» в сообщении на имя очереди. В нашем сценарии мы должны сохранить в сообщении «routing_key». Со стороны потребителя у нас есть несколько клиентов MQTT, которые фильтруют сообщения с помощью routing_key.   -  person Vimal David    schedule 13.09.2018