JtaTransaction с Jdbc XA и Jms

Итак, у меня есть проект весенней загрузки, в котором мне нужно сделать это:

  • Start Transaction -A JDBC poller, который читает строки со статусом TO_SEND, -Send a Jms для каждой строки, -Update status 'SENT' -Commit Transaction or rollback on failure

Сервер представляет собой Weblogic с источником данных XA для обрабатываемых строк, фабрикой XA для Jms, контекстом jndi и опросчиком интеграции Spring (jdbcpollingchaneladapter) и транзакцией jta:

Как указано здесь, в этом документе, в для этого мне нужно использовать JtaTransaction с userTransaction и создать нетранзакционный сеанс Jms

   // DATABASE Poller using JdbcPollingChannelAdapter
    @Bean
    @InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
    public MessageSource<?> jpaInbound() {
        // Select request by status = 'TO_SEND'
        JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
                StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
        StgOutJms stg = new StgOutJms();
        j.setRowMapper(stg);
        return j;
    }

     //Poller metadata with jta Transaction
     @Bean
        public PollerMetadata pollerMetadata() throws  NamingException   {
            return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
                    .transactional(transactionManager).get();
        }

Jta Transaction manager с использованием userTransaction:

    @Bean
    public PlatformTransactionManager transactionManager() throws NamingException {
        Hashtable<String, String> properties = new Hashtable<>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
        InitialContext vInitialContext = new InitialContext(properties);    
        UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");       
        return new JtaTransactionManager(xact);
    }

Процесс :

// Service Activator : Lunching the Jms Creation for each row
    @Bean
    @ServiceActivator(inputChannel = "jpaInputChannel")
    public MessageHandler handler() {

        return wlstoreMessage -> {
            try {
                                 
                jmsSenderService.
                consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
            } catch (NamingException | JMSException e) {
                log.error(e.getMessage(), e);
            }

        };
    }

    @Override
    public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
            logger.info("JMS: Consume messages");
    
            for (StgOutJms stgOutEntity : stgEntityList) {
                if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
                  
                    sendMessage(stgOutEntity);
                    stgOutEntity.setStatus("SENT");
                    repositoryOut.save(stgOutEntity);
                } else {
                    logger.error("The id  of the object received is null");
                }
            }
        }

JMS соединение:

    @Override
    public void initQueueConnection() throws NamingException, JMSException {

        Hashtable<String, String> properties = new Hashtable<String, String>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
        InitialContext vInitialContext = new InitialContext(properties);

        QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
                .lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));

        vQueueConnection = vQueueConnectionFactory.createQueueConnection();
        vQueueConnection.start();

        vQueueSession = vQueueConnection.createQueueSession(false, 0);

        Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));

        vQueueSender = vQueueSession.createSender(vQueue);
    }

Проблема с этим кодом заключается в том, что сообщения Jms отправляются в транзакции (фиксация при успехе, откат при ошибке), но отправленный статус никогда не обновляется (crudrepository).

Кроме того, я пробовал использовать jpaTransactionManager, он хорошо работает для сохранения базы данных, но сообщения Jms отправляются до фиксации транзакции (без отката jms при сбое).

Буду признателен за помощь!


person Hafsa Ch    schedule 02.07.2020    source источник


Ответы (2)


Я не знаком с Weblogic, но в WebSphere я настроил DataSource как XA, а фабрику соединений JMS как XA. И использовал диспетчер транзакций XA из JNDI, чтобы обернуть аналогичный вашему вызову в одну глобальную транзакцию JTA.

Я также могу предложить изучить эту статью, как избежать транзакций XA: https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html

person Artem Bilan    schedule 02.07.2020

После нескольких дней исследований я нашел единственное решение - передать диспетчер транзакций опросчику jdbc и использовать JmsTemplate с session transhibited = true, чтобы фиксация / откат jms и jdbc выполнялись в одной транзакции.

  // JMS Beans
    @Bean
    public JndiTemplate jndiTemplate() {

        final Properties jndiProps = new Properties();
        jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        jndiProps.setProperty(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));

        JndiTemplate jndiTemplate = new JndiTemplate();
        jndiTemplate.setEnvironment(jndiProps);

        return jndiTemplate;
    }

    @Bean
    public JndiObjectFactoryBean queueConnectionFactory() {
        JndiObjectFactoryBean queueConnectionFactory = new JndiObjectFactoryBean();
        queueConnectionFactory.setJndiTemplate(jndiTemplate());
        queueConnectionFactory.setJndiName(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));

        return queueConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate((ConnectionFactory) queueConnectionFactory().getObject());
        jmsTemplate.setReceiveTimeout(500);
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }

 @Bean
    public JndiObjectFactoryBean jmsQueueOut() {
        JndiObjectFactoryBean jmsQueue = new JndiObjectFactoryBean();
        jmsQueue.setJndiTemplate(jndiTemplate());
        jmsQueue.setJndiName(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));

        return jmsQueue;
    }

Использование опросчика путем передачи диспетчера транзакций

@Bean
    public PollerMetadata pollerMetadata() {
        return Pollers
                .fixedDelay(Long.valueOf(env.getProperty("poller.interval"))).transactional(transactionManager)
                .get();
    }

    // DATABASE Poller using JdbcPollingChannelAdapter
    @Bean
    @InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
    public MessageSource<?> jpaInbound() {

        // Select request by status = 'TO_SEND'
        JdbcPollingChannelAdapter poller = new JdbcPollingChannelAdapter(datasource,
                StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
        // RowMapper for mapping the list returned to the entity StgOutJms
        poller.setRowMapper(new StgOutJms());
        poller.setMaxRowsPerPoll(10);
        return poller;
    }

И активатор услуги

 // Service Activator : Lunching the Jms Creation for each row
    @Bean
    @ServiceActivator(inputChannel = "jpaInputChannel")
    public MessageHandler handler() {

        return wlstoreMessage -> {
            try {

                jmsSenderService.consumeMessage((Destination) jmsQueueOut().getObject(),
                        (List<StgOutJms>) wlstoreMessage.getPayload());
            } catch (NamingException | JMSException e) {
                log.error(e.getMessage(), e);
            }

        };
    }

Надеюсь, это поможет.

person Hafsa Ch    schedule 15.10.2020