Я пытаюсь создать поток (1), в котором сообщение получено от адаптера TCP, который может быть клиентом или сервером, и он отправляет сообщение брокеру ActiveMQ.
Мой другой поток (2) выбирает сообщение из требуемой очереди и отправляет по назначению
TCP (клиент / сервер) == (1) ==> ActiveMQ Broker == (2) ==> Адаптер исходящего HTTP-трафика
Я хочу убедиться, что в случае, если мое сообщение не будет доставлено в требуемое место назначения, оно повторно попытается отправить сообщение еще раз.
Мой текущий поток (1) к брокеру:
IntegrationFlow flow = IntegrationFlows
.from(Tcp
.inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
.serializer(customSerializer).deserializer(customSerializer)
.id("server").soTimeout(5000))
.id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
.wireTap("tcpInboundMessageLogChannel").channel(directChannel())
.handle(Jms.outboundAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.get();
this.flowContext.registration(flow).id("outflow").register();
и Мой поток (2) от брокера к исходящему http:
flow = IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.channel(directChannel())
.handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.mappedRequestHeaders("abc"))
.get();
this.flowContext.registration(flow).id("inflow").register();
Проблема:
В случае какого-либо исключения во время доставки, например, мой целевой URL-адрес не работает, он повторно пытается отправить сообщение.
После неудачной попытки повторить попытку 7 раз, т.е.
max attempt to 7
Если попытка по-прежнему не удалась, он отправляет сообщение в
ActiveMQ.DLQ
(очередь недоставленных сообщений) и не повторяет попытку снова, поскольку сообщение удаляется из реальной очереди и отправляется вActiveMQ.DLQ
.
Итак, мне нужен сценарий, при котором сообщение не будет потеряно, а сообщение будет обрабатываться по порядку.