Spring Retry Template блокирует мои очереди ответов

ЧЕГО Я ПЫТАЮСЬ ДОСТИЧЬ У меня есть вызов REST из пользовательского интерфейса, который вызывает добавление пользователя. Таким образом, пользователю придется создать асинхронную очередь (это ограничение), но затем дождаться очереди ответов в течение настроенного времени и обработать ее, прежде чем результат будет отправлен обратно в пользовательский интерфейс. Если очередь возвращается с пустым ссылочным номером, тогда я должен удалить запись пользователя и создать исключение, говорящее, что пользователь недействителен. Если ответ возвращается с действительной ссылкой (или если происходит тайм-аут), я считаю его действительным и возвращаю успех.

У меня есть приложение, в котором я отправляю сообщение очереди, чтобы получить referenceNumber для моего пользовательского объекта. Затем дождитесь ответа очереди, прежде чем ответить на вызов REST. Но мне нужно дождаться настроенного времени, чтобы ответ очереди вернулся.

UserManagerImpl

// REST CALL to persist
public User Persist(User user) {
...
...
 // Building the message for sending to QUEUE
 UserEnvelopeV1_0 userEnvelope =buildUserEnvelope(user);
// This is the place i send the queue message
userQueueClient.send(userEnvelope);
// Update Request time
updateRequestDetails(user.getUserId);
// This is the call i am going retry
boolean userValid = userRetryTemplate.doUserReferenceRetry(userId);
if (!userValid ) {
                  //remove User Object
                  throw Exception
                }
...
}

// update the request time for reference Number
private void updateRequestDetails(String userId) {
 User user = userRepository.findById(userId);
        if (user != null) {
            user.setRefRequestDateItem(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }

public void updateReference(String userId, String referenceNumber) {

        User user = userRepository.findById(userId);
        if (user != null) {
            user.setReference(referenceNumber);
            user.setResponseDate(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }
    }

UserQueueClient:

@Component
public class UserQueueClient {



    @JmsListener(id = "#{T(java.util.UUID).nameUUIDFromBytes('${in.res}",
            destination = "${in.res}", containerFactory = "containerFactory")
    public void receive(Message message, UserEnvelopeV1_0 envelope) throws{


        try {
            String userId = envelope.getHeader().getMessageIdentification().getUserId();
 ApplicationInformationStructure applicationInformation = envelope.getBody().getApplicationInformation();

if(CollectionUtils.isNotEmpty(applicationInformation.getApplicationInformationResult())) {
          String referenceNumber = applicationInformation.getApplicationInformationResult().getRefNumber();      

                userManager.updateReference(userId, referenceNumber);
            }

        } catch (Exception e) {
            //
        }
    }

    @Transactional(propagation = Propagation.MANDATORY)
    public void send(UserEnvelopeV1_0 sarsSoapEnvelope) throws JMSException {


        envelope.setHeader();

        Message message = sendToQueue(envelope, requestQueue, responseQueue,
                userId);

        applicationEventPublisher.publishEvent(new MessageLogEvent("USER_GET_REF_NUMBER", message, MessageType.XML,
                requestQueue, MessageDirection.SEND, true, false, new Date(), userId));

    }
}

Шаблон повторной попытки пользователя



@Component
public class UserRetryTemplate {


    @Value("${retry.max.attempts:5}")
    private int maxAttempts;

    @Value("${response.waiting.time.in.seconds:60}")
    private long maxDelay;

    @Autowired
    private UserRepository userRepository;

    private static final long INITIAL_INTERVAL = 2000L;

    public RetryTemplate retryTemplate() {

        // Max timeout in milliseconds
        long maxTimeout = maxDelay*1000;

        //double multiplier = (maxTimeout - INITIAL_INTERVAL)/((maxAttempts-2)*6000);

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(maxAttempts);


        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(maxTimeout/(maxAttempts-1));

        RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(retryPolicy);
        template.setBackOffPolicy(backOffPolicy);
        return template;
    }

    public boolean doUserReferenceRetry(String userId) {
        boolean isUserReferenceValid = true;
        try {
            boolean isValidUser = retryTemplate().execute(context -> {
                logger.info("Attempted {} times", context.getRetryCount());
                User user = userRepository.findById(userId);
                logger.info("User Retry :" + user);

                if (user.getResponseDateItem() == null || user.getReferenceNumber == null) {
                    logger.info("response not yet received");
                    throw new IllegalStateException("User Response not yet received");
                }
                if (user.getReferenceNumber != null)) {
                    return true;
                }
                throw new IllegalStateException("Response not yet received");
            });
            return isUserReferenceValid ;
        } catch (IllegalArgumentException e) {

        }
        return true;
    }

}

Итак, я реализовал логику, в которой я отправлю сообщение очереди и сделаю повторную попытку Spring (в течение настроенного времени), чтобы проверить базу данных, обновлен ли referenceNumber в БД. Кроме того, когда ответ очереди вернется, я обновлю БД с помощью referenceNumber.

Но когда я реализовал приведенную выше логику, повторная попытка spring повторяется до настроенного времени, но мое приложение Spring не обрабатывает никаких очередей ответов. Есть ли способ, которым приложение Spring может запускать оба процесса параллельно.

Проблема в том, что если я удаляю механизм весеннего повтора, очередь ответов обрабатывает мой ответ и обновляет запись пользователя с помощью ссылочного номера.

Но когда я добавил логику повтора, очередь ответов больше не обрабатывает мою очередь.


person Dev    schedule 01.04.2020    source источник


Ответы (1)


Я нашел ниже строчку запутанной.

"где я отправлю сообщение очереди и сделаю повторную попытку Spring (в течение настроенного времени), чтобы проверить базу данных, обновлен ли referenceNumber в БД. Кроме того, когда ответ очереди вернется, я обновлю БД с помощью referenceNumber ."

В одной строке вы говорите, что ожидаете обновления ссылочного номера, а в другой строке вы говорите, что обновляете базу данных. Кто здесь продюсер? есть две разные нити? Производитель и Потребитель в данном случае вы.

Если вы хотите заблокировать текущий поток на настроенное время. Можете ли вы рассмотреть возможность блокировки очереди с помощью метода опроса (длительный тайм-аут, блок TimeUnit)

poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

Пожалуйста, отредактируйте вопрос с достаточной информацией.

person Datta Diware    schedule 01.04.2020
comment
Отредактировал мой вопрос с деталями. Моя цель - обработать ответ очереди, который возвращается с номером ссылки, и я обновляю его в БД, в то же время я делаю повторную попытку, чтобы проверить, обновлена ​​ли БД с номером ссылки. - person Dev; 01.04.2020
comment
Вам нужно вызвать механизм Retry в отдельном потоке. Можете ли вы попробовать использовать Async Retry? - person Datta Diware; 02.04.2020
comment
Да, я пробовал, но асинхронная повторная попытка не ждет завершения повторной попытки, она отправляет ответ обратно в пользовательский интерфейс до завершения повторной попытки (я предполагаю, что это потому, что это асинхронная повторная попытка) - person Dev; 02.04.2020