Я определил схему AVRO и создал несколько классов с помощью avro-tools для схем. Теперь я хочу сериализовать данные на диск. Я нашел несколько ответов о scala для этого, но не для Java. Класс Article
создан с помощью avro-tools и создан на основе схемы, определенной мной.
Вот упрощенная версия кода того, как я пытаюсь это сделать:
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String fileContent = fileNameContent._2();
// An object from my avro schema
Article a = new Article(fileContent);
Processing processing = new Processing();
// .... some processing of the content here ... //
processing.serializeArticleToDisk(avroFileName);
return a;
});
где serializeArticleToDisk(avroFileName)
определяется следующим образом:
public void serializeArticleToDisk(String filename) throws IOException{
// Serialize article to disk
DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
dataFileWriter.create(this.article.getSchema(), new File(filename));
dataFileWriter.append(this.article);
dataFileWriter.close();
}
где Article
моя схема avro.
Теперь картограф выдает мне ошибку:
java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)
. . . rest of the stacktrace ...
хотя путь к файлу правильный.
После этого я использую метод collect()
, поэтому все остальное в функции map
работает нормально (кроме части сериализации).
Я новичок в Spark, поэтому я не уверен, что это может быть что-то тривиальное на самом деле. Я подозреваю, что мне нужно использовать некоторые функции записи, а не делать запись в маппере (хотя не уверен, что это правда). Любые идеи, как решить эту проблему?
ИЗМЕНИТЬ:
Последняя строка трассировки стека ошибок, которую я показал, на самом деле находится в этой части:
dataFileWriter.create(this.article.getSchema(), new File(filename));
Это та часть, которая выдает фактическую ошибку. Я предполагаю, что dataFileWriter
нужно заменить чем-то другим. Любые идеи?