Я пытаюсь понять, как работать с потоками в Java-клиенте, который подключается к HornetQ. Я не получаю конкретной ошибки, но не понимаю, как я должен работать с потоками в первую очередь (относительно клиента HornetQ и, в частности, MessageHandler.onMessage()
- потоки в целом для меня не проблема).
Если это актуально: я использую 'org.hornetq:hornetq-server:2.4.7.Final'
для запуска сервера, встроенного в мое приложение. Я не собираюсь что-то менять. В моей ситуации это просто удобнее с точки зрения эксплуатации, чем запуск отдельного серверного процесса.
Что я сделал до сих пор:
создать встроенный сервер:
new EmbeddedHornetQ(), .setConfiguration()
создать локатор серверов:
HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))
- создать фабрику сеансов:
serverLocator.createSessionFactory()
Теперь мне кажется очевидным, что я могу создать сеанс, используя hornetqClientSessionFactory.createSession()
, создать производителя и потребителя для этого сеанса и обрабатывать сообщения в одном потоке, используя .send()
и .receive()
.
Но я также обнаружил consumer.setMessageHandler()
, и это говорит мне о том, что я вообще не понимал многопоточность в клиенте. Я пытался использовать его, но затем потребитель вызывает messageHandler.onMessage()
в двух потоках, отличных от того, который создал сеанс. Кажется, это совпадает с моим впечатлением от просмотра кода — клиент HornetQ использует пул потоков для отправки сообщений.
Это оставляет меня в замешательстве. В javadocs говорится, что сеанс является "single-thread object"
, и код соглашается — там не происходит очевидной синхронизации. Но когда onMessage()
вызывается в нескольких потоках, message.acknowledge()
также вызывается в нескольких потоках, и этот один просто делегирует сеанс. Как это должно работать? Как будет выглядеть сценарий, в котором MessageHandler НЕ обращается к сеансу из нескольких потоков?
Далее, как мне отправлять последующие сообщения из onMessage()? Я использую HornetQ для постоянной очереди задач, поэтому отправка последующих сообщений является типичным использованием случай для меня. Но опять же, в пределах onMessage()
я нахожусь в неправильном потоке для доступа к сеансу.
Обратите внимание, что я могу держаться подальше от MessageHandler и просто использовать send() / receive()
таким образом, чтобы я мог контролировать потоки. Но я убежден, что вообще не понимаю всей ситуации, и это в сочетании с многопоточностью просто напрашивается на неприятности.
org.springframework.jms.listener.DefaultMessageListenerContainer
. Для перераспределения сообщения в той же очереди вы можете создать новое сообщение со свойством _HQ_SCHED_DELIVERY. Опять же, я предлагаю использоватьorg.springframework.jms.core.JmsTemplate
для отправки сообщений, это забота о соединениях, сессиях и так далее. - person user1516873   schedule 10.06.2016