Итак, у меня есть проект весенней загрузки, в котором мне нужно сделать это:
- Start Transaction -A JDBC poller, который читает строки со статусом TO_SEND, -Send a Jms для каждой строки, -Update status 'SENT' -Commit Transaction or rollback on failure
Сервер представляет собой Weblogic с источником данных XA для обрабатываемых строк, фабрикой XA для Jms, контекстом jndi и опросчиком интеграции Spring (jdbcpollingchaneladapter) и транзакцией jta:
Как указано здесь, в этом документе, в для этого мне нужно использовать JtaTransaction с userTransaction и создать нетранзакционный сеанс Jms
// DATABASE Poller using JdbcPollingChannelAdapter
@Bean
@InboundChannelAdapter(channel = "jpaInputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<?> jpaInbound() {
// Select request by status = 'TO_SEND'
JdbcPollingChannelAdapter j = new JdbcPollingChannelAdapter(datasource,
StgOutJmsRepository.FIND_FILTER_BY_STATUS_SQL);
StgOutJms stg = new StgOutJms();
j.setRowMapper(stg);
return j;
}
//Poller metadata with jta Transaction
@Bean
public PollerMetadata pollerMetadata() throws NamingException {
return Pollers.fixedDelay(Long.valueOf(env.getProperty("poller.interval")))
.transactional(transactionManager).get();
}
Jta Transaction manager с использованием userTransaction:
@Bean
public PlatformTransactionManager transactionManager() throws NamingException {
Hashtable<String, String> properties = new Hashtable<>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
UserTransaction xact = (UserTransaction) vInitialContext.lookup("javax.transaction.UserTransaction");
return new JtaTransactionManager(xact);
}
Процесс :
// Service Activator : Lunching the Jms Creation for each row
@Bean
@ServiceActivator(inputChannel = "jpaInputChannel")
public MessageHandler handler() {
return wlstoreMessage -> {
try {
jmsSenderService.
consumeMessage((List<StgOutJms>) wlstoreMessage.getPayload());
} catch (NamingException | JMSException e) {
log.error(e.getMessage(), e);
}
};
}
@Override
public void consumeMessage(List<StgOutJms> stgEntityList) throws NamingException, JMSException {
logger.info("JMS: Consume messages");
for (StgOutJms stgOutEntity : stgEntityList) {
if (nonNull(stgOutEntity) && nonNull(stgOutEntity.getIdentifiantUniqueLot())) {
sendMessage(stgOutEntity);
stgOutEntity.setStatus("SENT");
repositoryOut.save(stgOutEntity);
} else {
logger.error("The id of the object received is null");
}
}
}
JMS соединение:
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_SEND));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_SEND));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_SEND));
vQueueSender = vQueueSession.createSender(vQueue);
}
Проблема с этим кодом заключается в том, что сообщения Jms отправляются в транзакции (фиксация при успехе, откат при ошибке), но отправленный статус никогда не обновляется (crudrepository).
Кроме того, я пробовал использовать jpaTransactionManager, он хорошо работает для сохранения базы данных, но сообщения Jms отправляются до фиксации транзакции (без отката jms при сбое).
Буду признателен за помощь!