Заполнение темы ActiveMQ.Advisory.Consumer.Queue с использованием весеннего верблюда и атомикоса

Интересно, сталкивался ли кто-нибудь с этой проблемой при использовании комбинации Atomikos + Camel + ActiveMQ. Я использую эту комбинацию, чтобы отделять сообщения от очереди в транзакционном режиме. Кажется, это работает хорошо. Проблема в том, что сейчас я нахожусь в ситуации, когда мне нужно включить консультативные сообщения в ActiveMQ. После того, как я это сделал, я заметил, что все очереди постоянно пересоздают соединения. Об этом свидетельствует флуд топиков ActiveMQ.Advisory.Consumer.Queue. Это также заметно в журнале DEBUG, поскольку он постоянно создает соединение, открывает транзакцию, фиксирует ее и закрывает соединение. Это происходит без каких-либо реальных сообщений, генерируемых приложением. Все остальные очереди/темы без транзакций не имеют этой проблемы. Я читал в нескольких других сообщениях, что пул соединений и кэширование могут решить эту проблему. Похоже, мне не следует использовать кэширование, и я уже использую пул соединений. Я использую этот конфиг:

<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="atomikosConnectionFactory" />
    <property name="concurrentConsumers" value="1" />
    <property name="transacted" value="true" />
    <property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
    <property name="transactionManager" ref="jtaTransactionManager" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="poolSize">
        <value>4</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
    init-method="init" destroy-method="shutdownForce">
    <constructor-arg>
        <props>
            <prop key="com.atomikos.icatch.service">
                com.atomikos.icatch.standalone.UserTransactionServiceFactory
            </prop>
            <prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
        </props>
    </constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close" depends-on="userTransactionService">
    <property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

Он использует AtomikosConnectionFactoryBean, который, как я думал, реализует объединение. Может я ошибаюсь в этом? Я хотел бы услышать, есть ли кто-нибудь еще в этой лодке со мной, и что они сделали, чтобы исправить это.

@PeterSmith предложил реализацию

Петтер, спасибо за ваше предложение. Я изменил свою конфигурацию, чтобы использовать XaPooledConnectionFactory. Весна этим недовольна. Он считает, что XaPooledConnectionFactory не реализует XAConnectionFactory.

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="maxPoolSize">
        <value>32</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsPooledConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
     init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
    <property name="maxConnections" value="2" />
    <property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean> 
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>

java.lang.IllegalStateException: невозможно преобразовать значение типа [org.apache.activemq.pool.XaPooledConnectionFactory] в требуемый тип [javax.jms.XAConnectionFactory] для свойства «xaConnectionFactory»: не найдены подходящие редакторы или стратегия преобразования

В документах указано, что класс XaPooledConnectionFactory реализует javax.jms.XAConnectionFactory, поэтому я немного растерялся. Вроде должно работать.


person user2001388    schedule 22.01.2013    source источник
comment
Очень старая проблема, но вы в конце концов заставили ее работать? Можете ли вы обновить решение здесь.   -  person Varesh    schedule 05.08.2015
comment
это происходит и со мной, но с Spring Integration JMS.   -  person OhadR    schedule 15.04.2019


Ответы (1)


Я также видел это, используя приведенную выше комбинацию.

Похоже, что Spring DMLC (который реализует потребителя jms для верблюдов) использует цикл Consumer.receive(), чтобы иметь возможность включить операцию чтения в транзакцию XA.

Без кэширования XA в DMLC или даже без опроса SMLC возможно.

Попробуйте обернуть свой CF activemq в org.apache.activemq.pool.PooledConnectionFactory и посмотрите, поможет ли это.

person Petter Nordlander    schedule 23.01.2013
comment
Спасибо @peter. Я обновил исходный пост с вашим предложением. Дайте мне знать, что вы думаете. - person user2001388; 06.02.2013