Я пытаюсь использовать реактивные потоки Apache Camel (версия 2.25.3) в сочетании с Spring Boot для чтения большого файла CSV и демаршалинга строк с помощью Bindy. Это работает в том смысле, что приложение запускается и обнаруживает файлы по мере их появления, но тогда я вижу только первую строку файла в моем потоке. Похоже, это связано с Бинди, потому что, если я уберу демаршалинг из уравнения, я верну все строки файла csv в моем потоке очень хорошо. Я упростил задачу, чтобы продемонстрировать здесь, на SO. Я использую Spring Webflux, чтобы открыть полученный Publisher.
Итак, мой маршрут Camel выглядит следующим образом:
import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
private final CamelReactiveStreamsService camelRs;
@Override
public void configure() {
var bindy = new BindyCsvDataFormat(LineItem.class);
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.unmarshal(bindy)
.to("reactive-streams:lineItems");
}
public Flux<LineItem> getLineItemFlux() {
Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);
return Flux.from(lineItems);
}
}
Класс Бинди:
@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
@DataField(pos = 2)
private String description;
}
И конечная точка для открытия потока:
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
return lineItemFlux;
}
Итак, когда я сейчас делаю локон:
curl localhost:8080/lineItems
Я возвращаю только первую строку, тогда как когда я удаляю строку .unmarshal (bind) (и реорганизую поток, чтобы он имел тип String вместо LineItem), я возвращаю все элементы файла csv.
Поэтому я полагаю, что я не использую Bindy правильно в контексте реактивных потоков. Я следил за этой документацией по Camel и попытался переписать свою маршрут следующим образом:
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.to("reactive-streams:rawLines");
from("reactive-streams:rawLines")
.unmarshal(bindy)
.to("reactive-streams:lineItems");
Он показывает, что маршруты запущены правильно:
2021-01-04 10:13:26.798 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801 INFO 26438 --- [ main] o.a.camel.spring.SpringCamelContext : Total 2 routes, of which 2 are started
Но затем я получаю исключение о том, что у потока нет активных подписок:
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport ] [ 9]
[route1 ] [to1 ] [reactive-streams:rawLines ] [ 5]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.IllegalStateException: The stream has no active subscriptions
at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
Есть ли у кого-нибудь указатели, как я могу использовать Bindy в сочетании с реактивными потоками? Спасибо!
ИЗМЕНИТЬ
После очень полезного поста от burki я смог исправить свой код. Итак, определение маршрута изменилось на следующее. Как видите, я удалил шаг демаршализации, поэтому он просто забирает файлы из файловой системы по мере их поступления и помещает их в реактивный поток:
@Override
public void configure() {
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
.to("reactive-streams:extractedFile");
}
А затем выставьте файловый поток как Flux:
public Flux<File> getFileFlux() {
return Flux.from(camelRs.fromStream("extractedFile", File.class));
}
И код для синтаксического анализа CSV выглядит следующим образом (с использованием OpenCSV, как предлагается Burki, но с использованием другой части API):
private Flux<LineItem> readLineItems() {
return fileFlux
.flatMap(message -> Flux.using(
() -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
.withSeparator(';')
.withType(LineItem.class)
.build()
.stream(),
Flux::fromStream,
BaseStream::close)
);
}
private FileReader createFileReader(File file) {
System.out.println("Reading file from: " + file.getAbsolutePath());
try {
return new FileReader(file);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
}
Теперь вы можете предоставить этот результирующий Flux как конечную точку:
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
return readLineItems();
}
И теперь, когда вы выполняете завиток, как я сделал выше, вы получаете полные немаршалированные LineItems из csv.
У меня все еще есть задача, действительно ли это загружает весь файл в память или нет. Я так не думаю, я думаю, что получаю только указатель на файл, который затем передаю в bean-компонент OpenCSV, но мне нужно это проверить, может быть, теперь я сначала читаю весь файл в память, а затем транслирую его что приведет к поражению цели.