Обработка потоков в клиенте Java HornetQ

Я пытаюсь понять, как работать с потоками в Java-клиенте, который подключается к HornetQ. Я не получаю конкретной ошибки, но не понимаю, как я должен работать с потоками в первую очередь (относительно клиента HornetQ и, в частности, MessageHandler.onMessage() - потоки в целом для меня не проблема).

Если это актуально: я использую 'org.hornetq:hornetq-server:2.4.7.Final' для запуска сервера, встроенного в мое приложение. Я не собираюсь что-то менять. В моей ситуации это просто удобнее с точки зрения эксплуатации, чем запуск отдельного серверного процесса.

Что я сделал до сих пор:

  1. создать встроенный сервер: new EmbeddedHornetQ(), .setConfiguration()

  2. создать локатор серверов: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

  3. создать фабрику сеансов: serverLocator.createSessionFactory()

Теперь мне кажется очевидным, что я могу создать сеанс, используя hornetqClientSessionFactory.createSession(), создать производителя и потребителя для этого сеанса и обрабатывать сообщения в одном потоке, используя .send() и .receive().

Но я также обнаружил consumer.setMessageHandler(), и это говорит мне о том, что я вообще не понимал многопоточность в клиенте. Я пытался использовать его, но затем потребитель вызывает messageHandler.onMessage() в двух потоках, отличных от того, который создал сеанс. Кажется, это совпадает с моим впечатлением от просмотра кода — клиент HornetQ использует пул потоков для отправки сообщений.

Это оставляет меня в замешательстве. В javadocs говорится, что сеанс является "single-thread object", и код соглашается — там не происходит очевидной синхронизации. Но когда onMessage() вызывается в нескольких потоках, message.acknowledge() также вызывается в нескольких потоках, и этот один просто делегирует сеанс. Как это должно работать? Как будет выглядеть сценарий, в котором MessageHandler НЕ обращается к сеансу из нескольких потоков?

Далее, как мне отправлять последующие сообщения из onMessage()? Я использую HornetQ для постоянной очереди задач, поэтому отправка последующих сообщений является типичным использованием случай для меня. Но опять же, в пределах onMessage() я нахожусь в неправильном потоке для доступа к сеансу.

Обратите внимание, что я могу держаться подальше от MessageHandler и просто использовать send() / receive() таким образом, чтобы я мог контролировать потоки. Но я убежден, что вообще не понимаю всей ситуации, и это в сочетании с многопоточностью просто напрашивается на неприятности.


person Martin Geisse    schedule 06.06.2016    source источник
comment
Эти последующие сообщения: знаете ли вы заранее адресатов, которым вы собираетесь их отправить? Являются ли последующие пункты назначения статическими или динамическими?   -  person Tair    schedule 10.06.2016
comment
Адрес и очередь такие же, как и для исходного сообщения — единая очередь единиц работы. Не уверен, что вы подразумеваете под статическим/динамическим. Они являются динамическими в том смысле, что первоначальная рабочая единица должна быть обработана, чтобы знать, какие последующие сообщения должны быть сгенерированы.   -  person Martin Geisse    schedule 10.06.2016
comment
Я имею в виду, почему вы беспокоитесь о потокобезопасности сеанса, если вы можете передать производителя обработчику сообщений (вместо сеанса).   -  person Tair    schedule 10.06.2016
comment
По двум причинам: во-первых, похоже, что производитель (ClientProducerImpl) делегирует методы сеанса без какой-либо дополнительной синхронизации. Например, от send() к doSend() к session.startCall(). Во-вторых, даже без создания каких-либо последующих сообщений мне все равно приходится подтверждать каждое сообщение, у которого та же проблема: оно делегируется сеансу без какой-либо дополнительной синхронизации.   -  person Martin Geisse    schedule 10.06.2016
comment
onMessage() предназначен для нескольких одновременных потребителей, каждый потребитель имеет свой собственный сеанс. Например, если вам нравится исходный код, отметьте org.springframework.jms.listener.DefaultMessageListenerContainer. Для перераспределения сообщения в той же очереди вы можете создать новое сообщение со свойством _HQ_SCHED_DELIVERY. Опять же, я предлагаю использовать org.springframework.jms.core.JmsTemplate для отправки сообщений, это забота о соединениях, сессиях и так далее.   -  person user1516873    schedule 10.06.2016
comment
Код в DefaultMessageListenerContainer использует JMS API, а не HornetQ API, так что мне это особо не помогло. Вы говорите, что у каждого потребителя есть свой сеанс — как я могу получить доступ к этому сеансу? Метод onMessage() вызывается HornetQ в потоках, запущенных HornetQ без каких-либо параметров. _HQ_SCHED_DELIVERY должен отображать сообщение в определенное время в будущем, и я не понимаю, как это связано с проблемой потоковой передачи. Я не использую ни JMS, ни Spring, поэтому JmsTemplate недоступен.   -  person Martin Geisse    schedule 10.06.2016
comment
Я проверил, связаны ли сообщения с разными потребителями на основе потока, в котором они вызываются (следуя вашему утверждению, что onMessage предназначен для нескольких одновременных потребителей, каждый со своим сеансом). Однако этого вовсе не происходит. Сообщения, переданные в onMessage() в разных потоках, все связаны обратно с одним и тем же потребителем, таким образом, используя один и тот же сеанс.   -  person Martin Geisse    schedule 10.06.2016


Ответы (1)


Я могу ответить на часть вашего вопроса, хотя я надеюсь, что вы уже устранили проблему.

Сформируйте документацию HornetQ. на ClientConsumer (выделено мной):

ClientConsumer получает сообщения из очередей HornetQ.
Сообщения можно получать синхронно с помощью методов receive(), которые будут блокироваться до тех пор, пока сообщение не будет получено (или истечет время ожидания), или асинхронно, установив MessageHandler.
Эти 2 типа потребления являются исключительными: ClientConsumer с набором MessageHandler выдаст HornetQException, если будут вызваны его методы Receive().

Таким образом, у вас есть два варианта обработки приема сообщений:

  1. Synchronize the reception yourself
    • Do not provide a MessageListener to HornetQ
    • В своем собственном потоке cunsumer вызовите .receive() или .receive(long itmeout) на досуге.
    • Retrieve the (optional) ClientMessage object returned by the call
      • Pro: Using the Session you hopefully carry in the Consumer you can forward the message as you see fit
      • Против: вся эта обработка сообщений будет последовательной.
  2. Delegate Thread synchronization to HornetQ
    • Do not invoke .receive() on a Consumer
    • Provide a MessageListener implementation of onMessage(ClientMessage)
      • Pro: All the message handling will be concurrent and fast, hassle-free
      • Против: я не думаю, что возможно получить Session из этого объекта, так как он не отображается через интерфейс.
    • Непроверенный обходной путь: в своем приложении (которое, как и ваше, находится в виртуальной машине), я открыл лежащий в основе потокобезопасный QueueConnection в качестве статической переменной, доступной для всего приложения. Из вашего MessageListener вы можете вызвать на нем QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);, чтобы получить новый сеанс и отправить из него свои сообщения... Вероятно, это хорошо насколько я могу см., потому что объект Session на самом деле не создается заново. Я также сделал это, потому что сеансы имели тенденцию устаревать.

Я не думаю, что вы должны так сильно хотеть управлять своими потоками выполнения сообщений, особенно временными потоками, которые просто пересылают сообщения. Как вы уже догадались, HornetQ имеет встроенные пулы потоков и эффективно повторно использует эти объекты.

Также, как вы знаете, вам не нужно находиться в одном потоке для доступа к объекту (например, к очереди), поэтому не имеет значения, осуществляется ли доступ к очереди через несколько потоков или даже через несколько сеансов. Вам нужно только убедиться, что сеанс доступен только одному потоку, и это предусмотрено дизайном MessageListener.

person MrBrushy    schedule 06.10.2016