Spring Integration Гарантированная доставка сообщений JMS с использованием DSL

Я пытаюсь создать поток (1), в котором сообщение получено от адаптера TCP, который может быть клиентом или сервером, и он отправляет сообщение брокеру ActiveMQ.

Мой другой поток (2) выбирает сообщение из требуемой очереди и отправляет по назначению

TCP (клиент / сервер) == (1) ==> ActiveMQ Broker == (2) ==> Адаптер исходящего HTTP-трафика

Я хочу убедиться, что в случае, если мое сообщение не будет доставлено в требуемое место назначения, оно повторно попытается отправить сообщение еще раз.

Мой текущий поток (1) к брокеру:

IntegrationFlow flow = IntegrationFlows
            .from(Tcp
                    .inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
                            .serializer(customSerializer).deserializer(customSerializer)
                            .id("server").soTimeout(5000))
                    .id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
            .wireTap("tcpInboundMessageLogChannel").channel(directChannel())
            .handle(Jms.outboundAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .get();

    this.flowContext.registration(flow).id("outflow").register();

и Мой поток (2) от брокера к исходящему http:

flow = IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .channel(directChannel())
            .handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class)
                    .mappedRequestHeaders("abc"))
            .get();
    this.flowContext.registration(flow).id("inflow").register();

Проблема:

  • В случае какого-либо исключения во время доставки, например, мой целевой URL-адрес не работает, он повторно пытается отправить сообщение.

  • После неудачной попытки повторить попытку 7 раз, т.е. max attempt to 7

  • Если попытка по-прежнему не удалась, он отправляет сообщение в ActiveMQ.DLQ (очередь недоставленных сообщений) и не повторяет попытку снова, поскольку сообщение удаляется из реальной очереди и отправляется в ActiveMQ.DLQ.

Итак, мне нужен сценарий, при котором сообщение не будет потеряно, а сообщение будет обрабатываться по порядку.


person shivam tiwari    schedule 26.06.2018    source источник


Ответы (1)


Во-первых: я считаю, что вы можете настроить jmsInbound для бесконечных повторных попыток:

/**
 * Configuration options for a messageConsumer used to control how messages are re-delivered when they
 * are rolled back.
 * May be used server side on a per destination basis via the Broker RedeliveryPlugin
 *
 * @org.apache.xbean.XBean element="redeliveryPolicy"
 *
 */
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {

С другой стороны, вы можете настроить .handle(Http.outboundChannelAdapter( для RequestHandlerRetryAdvice для аналогичного поведения повторных попыток, но внутри приложения без циклических обращений к JMS и обратно: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice

Вот пример того, как это можно настроить с точки зрения Java DSL:

    @Bean
    public IntegrationFlow errorRecovererFlow() {
        return IntegrationFlows.from(Function.class, "errorRecovererFunction")
                .handle((GenericHandler<?>) (p, h) -> {
                    throw new RuntimeException("intentional");
                }, e -> e.advice(retryAdvice()))
                .get();
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public MessageChannel recoveryChannel() {
        return new DirectChannel();
    }

RequestHandlerRetryAdvice можно настроить с помощью RetryTemplate для применения чего-то вроде AlwaysRetryPolicy. См. Проект Spring Retry для получения дополнительной информации: https://github.com/spring-projects/spring-retry

person Artem Bilan    schedule 26.06.2018