У нас есть список путей к файлам из таблицы БД с отметкой времени ее создания. Попытка выяснить, как мы можем использовать список путей к файлам из db для пересылки только тех файлов из nfs в приемник kafka.
Прямо сейчас я использую настраиваемую версию ContinuousFileMonitoringFunction с корнем папки, которая будет содержать все файлы, которые покажет БД. Эта операция выполняется очень медленно, так как выполняется просмотр папки для сбора информации об обновленных файлах, поскольку папка слишком велика и содержит несколько ТБ данных.
Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));
DataSet<String> ds = result.toDataSet();
ds содержит все пути к файлам, которые следует отправить в kafka.
Ниже представлена идея, которую я планирую реализовать. Но есть ли более эффективный подход с учетом параллелизма flink, поддержки библиотеки flink и т. д.?
public class FileContentMap extends RichFlatMapFunction<String, String> {
@Override
public void flatMap(String input, Collector<String> out) throws Exception {
// get the file path
String filePath = input;
String fileContent = readFile(input);
out.collect(fileCOntent);
}
@Override
public void open(Configuration config) {
}
}
DataSet<String> contectDataSet = ds.map(new FileCOntentMap());
contectDataSet.addSink(kafkaProducer);