Как сериализовать данные в схему AVRO в Spark (с Java)?

Я определил схему AVRO и создал несколько классов с помощью avro-tools для схем. Теперь я хочу сериализовать данные на диск. Я нашел несколько ответов о scala для этого, но не для Java. Класс Article создан с помощью avro-tools и создан на основе схемы, определенной мной.

Вот упрощенная версия кода того, как я пытаюсь это сделать:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //

    processing.serializeArticleToDisk(avroFileName);

    return a;
});

где serializeArticleToDisk(avroFileName) определяется следующим образом:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));
    dataFileWriter.append(this.article);
    dataFileWriter.close();
}

где Article моя схема avro.

Теперь картограф выдает мне ошибку:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

хотя путь к файлу правильный.

После этого я использую метод collect(), поэтому все остальное в функции map работает нормально (кроме части сериализации).

Я новичок в Spark, поэтому я не уверен, что это может быть что-то тривиальное на самом деле. Я подозреваю, что мне нужно использовать некоторые функции записи, а не делать запись в маппере (хотя не уверен, что это правда). Любые идеи, как решить эту проблему?

ИЗМЕНИТЬ:

Последняя строка трассировки стека ошибок, которую я показал, на самом деле находится в этой части:

dataFileWriter.create(this.article.getSchema(), new File(filename));

Это та часть, которая выдает фактическую ошибку. Я предполагаю, что dataFileWriter нужно заменить чем-то другим. Любые идеи?


person Belphegor    schedule 11.04.2016    source источник
comment
Возможно, ознакомьтесь с обсуждениями и ответами здесь: stackoverflow.com/ вопросы/20612571/spark-write-to-avro-file   -  person David Griffin    schedule 11.04.2016
comment
Я уже видел это, меня больше интересовал эквивалент Java. Спасибо за комментарий!   -  person Belphegor    schedule 11.04.2016


Ответы (2)


Это решение не использует фреймы данных и не выдает никаких ошибок:

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

   .  .  .  .  .

// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
        job.getConfiguration());

где AvroUtils.getJobOutputKeyAvroSchema это:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
    Job job;

    try {
        job = new Job();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    AvroJob.setOutputKeySchema(job, avroSchema);
    return job;
}

Похожие вещи для Spark + Avro можно найти здесь -> https://github.com/CeON/spark-utils .

person Belphegor    schedule 13.04.2016

Похоже, вы неправильно используете Spark.

Map — это функция преобразования. Простой вызов map не вызывает вычисления RDD. Вы должны вызвать действие, например forEach() или collect().

Также обратите внимание, что лямбда, предоставленная map, будет сериализована в драйвере и передана некоторому Node в кластере.

ДОБАВЛЕНО

Попробуйте использовать Spark SQL и Spark-Avro, чтобы сохранить Spark DataFrame в формате Avro:

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("/examples/people.txt")
    .map(Person::parse);

// Apply a schema to an RDD
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class);
peopleDF.write()
    .format("com.databricks.spark.avro")
    .save("/output");
person Denis Kokorin    schedule 11.04.2016
comment
О чем вы говорите -- map абсолютно точно вызывает расчет RDD. map возвращает новый RDD со всеми элементами, пересчитанными на основе функции map. - person David Griffin; 11.04.2016
comment
@Денис Кокорин: Я потом использую collect(), так что в map уже все работает, все в порядке. Все, кроме сериализации, работает внутри функции map. - person Belphegor; 11.04.2016
comment
Возможно, он имеет в виду, что вы должны добавить foreach после map и писать там? Было бы полезно, если бы в этом ответе был пример кода. - person David Griffin; 11.04.2016
comment
@DavidGriffin это правда. Денис Кокорин: можно пример кода? Кроме того, я обновил вопрос на случай, если вам понадобится дополнительная информация. - person Belphegor; 11.04.2016
comment
Извините, что не упомянул об этом раньше. Но ваша ошибка указывает на hdfs:/...path.../avroFileName.avro. По умолчанию протокол HDFS не разрешается Java. Попробуйте использовать файловую систему Hadoop, чтобы открыть файл OutputStream. Также вам определенно не следует использовать map() для сохранения чего-либо в HDFS. Используйте foreach() или store(). - person Denis Kokorin; 11.04.2016
comment
@DenisKokorin, где эта функция store()? Я не могу найти его здесь, в классе JavaRDD spark.apache.org/docs/1.6.1/api/java/org/apache/spark/api/java/ На какой именно store() вы имеете в виду? - person Belphegor; 11.04.2016
comment
Я отредактировал свой оригинальный пост. Извините, что так ввел в заблуждение. Я написал этот ответ в спешке. - person Denis Kokorin; 11.04.2016
comment
@DenisKokorin Мне действительно нужна версия с целым текстовым файлом (как в моем коде), а не для чтения файла построчно. Кроме того, фрейм данных может не подходить для меня, так как у меня есть поля, содержащие несколько элементов (массивов). Я постараюсь приспособить ваше предложение к моим потребностям. Спасибо за помощь! - person Belphegor; 12.04.2016
comment
@DenisKokorin хорошо, ваше предложение не работает. В строке DataFrame articleDF = sqlContext.createDataFrame(filteredFiles, Article.class); я получаю java.lang.StackOverflowError. Сами отдельные файлы не большие (пару сотен КБ), так что не знаю, почему вылетает эта ошибка. Как-то обойти этот метод? - person Belphegor; 12.04.2016