Flink пересылать файлы из List‹String› filePaths

У нас есть список путей к файлам из таблицы БД с отметкой времени ее создания. Попытка выяснить, как мы можем использовать список путей к файлам из db для пересылки только тех файлов из nfs в приемник kafka.

Прямо сейчас я использую настраиваемую версию ContinuousFileMonitoringFunction с корнем папки, которая будет содержать все файлы, которые покажет БД. Эта операция выполняется очень медленно, так как выполняется просмотр папки для сбора информации об обновленных файлах, поскольку папка слишком велика и содержит несколько ТБ данных.

Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));

DataSet<String> ds  = result.toDataSet();

ds содержит все пути к файлам, которые следует отправить в kafka.

Ниже представлена ​​идея, которую я планирую реализовать. Но есть ли более эффективный подход с учетом параллелизма flink, поддержки библиотеки flink и т. д.?

public class FileContentMap extends RichFlatMapFunction<String, String> {

      

    @Override
    public void flatMap(String input, Collector<String> out) throws Exception {

       
       
        // get the file path
        String filePath = input;

        String fileContent = readFile(input);

    out.collect(fileCOntent);

       
    }

    @Override
    public void open(Configuration config) {
       
    }
}

DataSet<String> contectDataSet = ds.map(new FileCOntentMap());

contectDataSet.addSink(kafkaProducer);

person VSK    schedule 12.08.2020    source источник


Ответы (1)


Ваш подход кажется мне хорошим. Возможно, более эффективным было бы создание RichParallelSourceFunction, где в методе open() вы делаете вызов БД, чтобы получить список файлов, которые были обновлены, и вы создаете в памяти список файлов, которые этот конкретный источник под- задача (что-то вроде filePath.hashCode() % numSubTasks == mySubTask) должна быть обработана вашим FileContentMap.

person kkrugler    schedule 18.08.2020