Получение сообщения от rabbitmq, неспособного вернуть сообщение в многопоточной среде

Вот что делает мое приложение

1) Клиентское приложение Отправить сообщение запроса с correlationId на REQUEST_QUEUE_1 и ожидает ответного сообщения на RESPONSE_QUEUE_1, используя correlationId

2) Серверное приложение получает сообщение от REQUEST_QUEUE_1

3) Серверное приложение обрабатывает сообщение, полученное на шаге 2

4) Серверное приложение отправляет ответ на RESPONSE_QUEUE_1 асинхронно с correlationId, полученным как часть сообщения с шага 2.

5) Поток клиентского приложения с шага 1 получает ответ от RESPONSE_QUEUE_1 для correlationId.

Вот фрагмент кода Java-программы, выполняющей sendMessage и receiveMessage в очередь и из очереди как на стороне клиента, так и на стороне сервера - https://gist.github.com/graj2014/f580b35579bf8a94029d.

Этот код отлично работает при тестировании в одном запросе к клиентскому приложению с использованием jmeter. Однако одновременный запрос нескольких пользователей оставляет некоторые сообщения в неподтвержденном состоянии и соответствующие ответы Клиенту получают тайм-аут.

Проблема в методе receiveMessage с msgCorrelationId, который соответствует сообщению в очереди для подтверждения.


person GRaj    schedule 28.07.2014    source источник


Ответы (1)


Проблема здесь:

if (msgCorrelationId.toString().equals(correlationId)) {
        ....
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        break;
}

Если вы получаете сообщения, в которых есть correlationId, но этого не так много, вы просто пропускаете их из прецессии и, конечно же, теряете для них acknowledgement.

Непонятно, почему вы так усердно работаете с шаблоном request-reply, потому что RabbitTemplate.sendAndReceive() делает то, что нужно.

См. Документы и тестовый пример

person Artem Bilan    schedule 28.07.2014
comment
Я не хочу их признавать, поскольку они предназначены для использования другим потоком. Так следует ли мне называть channel.basicNack со значением Requeue true? - person GRaj; 29.07.2014
comment
Это правильно. И я не говорю, что вы должны ack их. Правильно, basicNack может помочь, но в конечном итоге вы обнаружите, что один и тот же слушатель (один из этих одновременных или даже каждый из них) начинает выполнять бесполезную работу с неинтересными сообщениями. Вот почему было бы лучше просто полагаться на TemporaryReplyQueue для каждого сообщения, как это делается в RabbitTemplate.sendAndReceive() по умолчанию - без replyQueue. Корень проблемы здесь в том, что AMQP не предоставляет параметр selector, как в JMS. - person Artem Bilan; 29.07.2014
comment
Я использовал RabbitTemplate SendAndReceive, работает. Однако когда я тестирую его под нагрузкой. Я вижу, что время отклика для одновременных запросов увеличивается с увеличением количества одновременных запросов. Допустим, с jmeter, 1 пользователь на 1000 итераций, среднее время отклика составляет 7 мс. с 2-мя пользователями на 1000, взяв в среднем соотв. время - 14 мс. Как улучшить время отклика, чтобы оно стало постоянным при одновременном запросе. - person GRaj; 30.07.2014
comment
Вам не кажется, что вы делаете это concurrency в одном потоке? - person Artem Bilan; 30.07.2014
comment
Мой пример использования заключается в том, что из браузера отправляются разные запросы, которые запускают sendAndReceive. Поэтому не понимаю, как можно использовать один поток для sendAndReceive. - person GRaj; 30.07.2014