Интеграция Spring Асинхронное / параллельное выполнение DSL Scatter-Gather для нескольких получателей

мы пытаемся делать параллельные вызовы разным получателям, используя scatter-gather, и это работает нормально. Но второй поток получателей не запускается, если первый не завершен (отслеживается в Zipkin). есть ли способ сделать всех получателей асинхронными .. очень похожими на разделение-агрегирование с каналом исполнителя.

public IntegrationFlow flow1() {

        return flow -> flow
                .split().channel(c -> c.executor(Executors.newCachedThreadPool()))
                .scatterGather(
                        scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(flow2())
                                .recipientFlow(flow3())
                                .recipientFlow(flow4())
                                .recipientFlow(flow5()),
                        gatherer -> gatherer
                                .outputProcessor(messageGroup -> {
                                    Object request = gatherResponse(messageGroup);
                                    return createResponse(request);
                                }))
                .aggregate();
    }

Методы flow2 (), flow3 (), flow4 () - это методы с InterationFlow в качестве возвращаемого типа.

пример кода flow2():

public IntegrationFlow flow2() {
        return integrationFlowDefinition -> integrationFlowDefinition
                .enrichHeaders(
                        h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
                .transform(ele -> createRequest1(ele))                  
                .wireTap("asyncXMLLogging")
                .handle(wsGateway.applyAsHandler(endpoint1))
                .transform(
                        ele -> response2(ele));
    }

person kiran reddy    schedule 25.07.2018    source источник


Ответы (1)


Это действительно возможно с упомянутым executor channel. Все потоки получателей действительно должны начинаться с ExecutorChannel. В вашем случае вы должны изменить их все примерно так:

public IntegrationFlow flow2() {
    return IntegrationFlows.from(MessageChannels.executor(taskExexecutor()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele))
            .get();
}

Обратите внимание на IntegrationFlows.from(MessageChannels.executor(taskExexecutor())). Именно так вы можете сделать каждый подпоток асинхронным.

ОБНОВЛЕНИЕ

Для более старой версии Spring Integration без IntegrationFlow улучшений для подпотоков мы можем сделать следующее:

public IntegrationFlow flow2() {
    return integrationFlowDefinition -> integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .enrichHeaders(
                    h -> h.header(TransportConstants.HEADER_CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE))
            .transform(ele -> createRequest1(ele))                  
            .wireTap("asyncXMLLogging")
            .handle(wsGateway.applyAsHandler(endpoint1))
            .transform(
                    ele -> response2(ele));
}

Это похоже на то, что вы показываете в комментарии выше.

person Artem Bilan    schedule 25.07.2018
comment
пока я тестирую IntegrationFlows.from(MessageChannels.executor(Executors.newCachedThreadPool())). это создает другую проблему, когда from имеет имя канала; что-то вроде IntegrationFlows.from (enricherChannel) - person kiran reddy; 25.07.2018
comment
мой плохой: IntegrationFlows.from (enricherChannel) .channel (c - ›c.executor (Executors.newCachedThreadPool ())). это должно работать нормально. он передает сообщение в поток и синхронизирует остальную часть потока. понятно!!! - person kiran reddy; 25.07.2018
comment
Что ж, вам не нужен этот дополнительный канал. Этого вам вполне хватит: IntegrationFlows.from(MessageChannels.executor("enricherChannel", Executors.newCachedThreadPool())) - person Artem Bilan; 25.07.2018
comment
Получение исключения в flow2 () после добавления комбинации executor & from Вызвано: java.lang.UnsupportedOperationException: null. Прослежено до: @Override public void configure(IntegrationFlowDefinition<?> flow) { throw new UnsupportedOperationException(); } - person kiran reddy; 26.07.2018
comment
Это интересно, я изменил . recipientFlow на recipient и назвал канал, приложение запускается нормально; Но . recipientFlow по-прежнему выдает неподдерживаемое исключение - person kiran reddy; 26.07.2018
comment
Что ж, похоже, вы не используете последнюю версию Spring Integration. Итак, да: в этом случае лучше было бы пойти классическим recipient на основе имени канала из bean-компонентов подпотока. Из вас можно сделать то же, что я показываю в ОБНОВЛЕНИИ моего ответа. - person Artem Bilan; 26.07.2018
comment
Как должна выглядеть реализация taskExexecutor(), чтобы это работало? - person Chris; 13.07.2021
comment
Я обнаружил, что могу заменить taskExexecutor() внедренным экземпляром java.util.concurrent.ExecutorService. - person Chris; 13.07.2021