Spring XD: цепочка конвейеров с разделением, объединением и локальным выполнением

Я хотел бы добиться следующего:

  • Учитывая список URL-адресов (изображения в формате png), загрузите файлы в параллельных процессах (разделите)
  • Преобразование формата файла для загруженных файлов в параллельных процессах (например, png2jpg в.png из.jpg)
  • Подождите, пока все файлы будут преобразованы, затем извлеките все файлы в одно место и выполните над ними последнюю операцию (например, создайте гигантское мозаичное изображение из всех изображений).

Преобразование изображения приведено в качестве примера. Моя обработка на самом деле более сложная, но я попытался выполнить требование, что мне нужно разделить обработку файлов и что мне нужно выполнить окончательную обработку после операции соединения.

У меня есть базовое представление о том, как связаны каналы Spring XD (аналогично каналам оболочки). Есть сплиттер и агрегатор. Я ожидаю, что смогу передавать файлы как объекты в следующий канал.

Однако как бинарные объекты, коллекции и операции разделения/агрегирования сочетаются в XD?

Для меня было бы очень полезно, если бы вы могли предоставить мне рабочий пример (например, вместо преобразования для простоты запускать задачу оболочки «cat file1 file2 > outputfile»).

[1] https://github.com/spring-projects/spring-xd/wiki/Processors#splitter


person benjist    schedule 02.12.2014    source источник


Ответы (1)


Это «просто работает».

XD скрытно использует Spring Integration; обратитесь к его документации о сплиттерах/агрегаторах.

Сообщения Spring Integration имеют заголовки и полезную нагрузку; фреймворк ничего не знает о полезной нагрузке; это может быть что угодно. Предположительно, вы начнете со списка URL-адресов, разделите их, вызовите http, чтобы получить содержимое PNG для каждого. Вызовите какой-нибудь сервис для преобразования и отправки результатов агрегатору.

Разделитель устанавливает заголовки (correlationId, sequenceSize, sequenceNumber) для каждого сообщения. Агрегатор нижестоящего потока (по умолчанию) использует эти заголовки для повторной сборки результатов (стратегия выпуска по умолчанию заключается в том, что размер агрегированной группы соответствует заголовку sequenceSize). В вашем случае результатом будет Collection изображений; окончательный пользовательский процессор после того, как агрегатор может преобразовать этот список в вашу окончательную мозаику.

ИЗМЕНИТЬ:

source | splitter | http-client | processor1 | aggregator | processor2 | file

Где:

source - generates list of URLs
http-client - fetches the PNGs (binary payloads)
processor1 - convert to JPG
processor2 - create mosaic

Вам нужен собственный код в исходниках, процессор1 и процессор2; остальное должно быть просто конфигурацией в определении потока.

EDIT2:

Вы, вероятно, хотите, чтобы это было управляемо событиями, поэтому, возможно, это будет лучше:

http | json-to-object | splitter | http-client | processor1 | aggregator | processor2 | file

и POST ваш список URL-адресов как JSON.

Если вместо этого вы действительно хотите использовать опрашиваемый источник, подойдет настраиваемый источник, аналогичный источнику триггера...

    <int:inbound-channel-adapter channel="output"
        auto-startup="false" ref="myPojo" method="foo">
        <int:poller fixed-delay="${fixedDelay}" time-unit="SECONDS" />
    </int:inbound-channel-adapter>

    <bean id="myPojo" class="foo.UrlGenerator" />

и пусть foo() возвращает null, если нечего обрабатывать, или список URL-адресов.

person Gary Russell    schedule 02.12.2014
comment
Можете ли вы привести крошечный пример, например, для создания исходной коллекции? - person benjist; 02.12.2014
comment
Кажется, нет json-to-object. Что он должен делать? - person benjist; 03.12.2014
comment
В настоящее время нет стандартного процессора XD json-to-object, но его легко создать; аналогично object-to-json, но с использованием <int:json-to-object-transformer/> в Spring Integration. Документы здесь — прокрутите вниз до JSON. - person Gary Russell; 03.12.2014
comment
Нет причин использовать json-to-object. splitter может просто принять --expression="#jsonPath(payload, '$.*')", если ваш JSON, конечно, представляет собой массив URL... - person Artem Bilan; 03.12.2014