Реактивные потоки Apache Camel с Bindy - только чтение первой строки

Я пытаюсь использовать реактивные потоки Apache Camel (версия 2.25.3) в сочетании с Spring Boot для чтения большого файла CSV и демаршалинга строк с помощью Bindy. Это работает в том смысле, что приложение запускается и обнаруживает файлы по мере их появления, но тогда я вижу только первую строку файла в моем потоке. Похоже, это связано с Бинди, потому что, если я уберу демаршалинг из уравнения, я верну все строки файла csv в моем потоке очень хорошо. Я упростил задачу, чтобы продемонстрировать здесь, на SO. Я использую Spring Webflux, чтобы открыть полученный Publisher.

Итак, мой маршрут Camel выглядит следующим образом:

import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
    private final CamelReactiveStreamsService camelRs;

    @Override
    public void configure() {
        var bindy = new BindyCsvDataFormat(LineItem.class);

        from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
                .unmarshal(bindy)
                .to("reactive-streams:lineItems");
    }

    public Flux<LineItem> getLineItemFlux() {
        Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);

        return Flux.from(lineItems);
    }
}

Класс Бинди:

@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
    @DataField(pos = 2)
    private String description;
}

И конечная точка для открытия потока:

@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return lineItemFlux;
}

Итак, когда я сейчас делаю локон:

curl localhost:8080/lineItems

Я возвращаю только первую строку, тогда как когда я удаляю строку .unmarshal (bind) (и реорганизую поток, чтобы он имел тип String вместо LineItem), я возвращаю все элементы файла csv.

Поэтому я полагаю, что я не использую Bindy правильно в контексте реактивных потоков. Я следил за этой документацией по Camel и попытался переписать свою маршрут следующим образом:

from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
        .to("reactive-streams:rawLines");

from("reactive-streams:rawLines")
        .unmarshal(bindy)
        .to("reactive-streams:lineItems");

Он показывает, что маршруты запущены правильно:

2021-01-04 10:13:26.798  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Total 2 routes, of which 2 are started

Но затем я получаю исключение о том, что у потока нет активных подписок:

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport      ] [         9]
[route1            ] [to1               ] [reactive-streams:rawLines                                                     ] [         5]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalStateException: The stream has no active subscriptions
    at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]

Есть ли у кого-нибудь указатели, как я могу использовать Bindy в сочетании с реактивными потоками? Спасибо!

ИЗМЕНИТЬ

После очень полезного поста от burki я смог исправить свой код. Итак, определение маршрута изменилось на следующее. Как видите, я удалил шаг демаршализации, поэтому он просто забирает файлы из файловой системы по мере их поступления и помещает их в реактивный поток:

@Override
public void configure() {
    from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
            .to("reactive-streams:extractedFile");
}

А затем выставьте файловый поток как Flux:

public Flux<File> getFileFlux() {
    return Flux.from(camelRs.fromStream("extractedFile", File.class));
}

И код для синтаксического анализа CSV выглядит следующим образом (с использованием OpenCSV, как предлагается Burki, но с использованием другой части API):

private Flux<LineItem> readLineItems() {
    return fileFlux
            .flatMap(message -> Flux.using(
                    () -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
                            .withSeparator(';')
                            .withType(LineItem.class)
                            .build()
                            .stream(),
                    Flux::fromStream,
                    BaseStream::close)
            );
}

private FileReader createFileReader(File file) {
    System.out.println("Reading file from: " + file.getAbsolutePath());
    try {
        return new FileReader(file);
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}

Теперь вы можете предоставить этот результирующий Flux как конечную точку:

@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return readLineItems();
}

И теперь, когда вы выполняете завиток, как я сделал выше, вы получаете полные немаршалированные LineItems из csv.

У меня все еще есть задача, действительно ли это загружает весь файл в память или нет. Я так не думаю, я думаю, что получаю только указатель на файл, который затем передаю в bean-компонент OpenCSV, но мне нужно это проверить, может быть, теперь я сначала читаю весь файл в память, а затем транслирую его что приведет к поражению цели.


person Jonck van der Kogel    schedule 04.01.2021    source источник


Ответы (1)


Я предполагаю, что потребитель файла просто передаст весь файл на этап демаршаллинга.

Поэтому, если вы демаршалируете результат потребителя файла до LineItem, вы уменьшите все содержимое файла до первой строки.

Если вы, наоборот, удалите немаршалинг, вы получите все содержимое файла. Но, вероятно, потребитель файла загрузил весь файл в память, прежде чем передать его.

Но чтение всего файла - это не то, что вам нужно. Чтобы прочитать файл CSV построчно, вам необходимо разделить файл в потоковом режиме.

from("file:...")
    .split(body().tokenize(LINE_FEED)).streaming()
    .to("direct:processLine") 

Таким образом, Splitter отправляет каждую строку в маршрут direct:processLine для дальнейшей обработки.

Проблема, с которой я столкнулся в этом сценарии, заключалась в том, чтобы проанализировать одну строку CSV. Большинство библиотек CSV предназначены для чтения и анализа файлов целиком, а не отдельных строк.

Однако довольно старая библиотека OpenCSV имеет _ 4_ с parseLine(String csvLine) методом. Поэтому я использовал это для анализа полностью отдельной строки CSV.

person burki    schedule 06.01.2021
comment
Отлично, спасибо бурки, собираюсь проверить! - person Jonck van der Kogel; 06.01.2021
comment
Это был намек, что мне нужны бурки, еще раз спасибо! Я опубликую свое решение в редакции выше для других, кто может найти его интересным. - person Jonck van der Kogel; 06.01.2021