Я хотел бы прочитать папку hdfs, содержащую файлы avro с spark. Затем я хотел бы десериализовать события avro, содержащиеся в этих файлах. Я хотел бы сделать это без библиотеки com.databrics (или любой другой, позволяющей легко это сделать).
Проблема в том, что у меня проблемы с десериализацией.
Я предполагаю, что мой файл avro сжат с помощью snappy, потому что в начале файла (сразу после схемы) у меня есть
avro.codecsnappy
написано. Затем следуют читаемые или нечитаемые символы.
Моя первая попытка десериализации события avro следующая:
public static String deserialize(String message) throws IOException {
Schema.Parser schemaParser = new Schema.Parser();
Schema avroSchema = schemaParser.parse(defaultFlumeAvroSchema);
DatumReader<GenericRecord> specificDatumReader = new SpecificDatumReader<GenericRecord>(avroSchema);
byte[] messageBytes = message.getBytes();
Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord genericRecord = specificDatumReader.read(null, decoder);
return genericRecord.toString();
}
Эта функция работает, когда я хочу десериализовать файл avro, в котором нет avro.codecsbappy. Когда это так, у меня есть ошибка:
Неправильные данные: отрицательная длина: -50
Поэтому я попробовал сделать это по-другому:
private static void deserialize2(String path) throws IOException {
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
DataFileReader<GenericRecord> fileReader =
new DataFileReader<>(new File(path), reader);
System.out.println(fileReader.getSchema().toString());
GenericRecord record = new GenericData.Record(fileReader.getSchema());
int numEvents = 0;
while (fileReader.hasNext()) {
fileReader.next(record);
ByteBuffer body = (ByteBuffer) record.get("body");
CharsetDecoder decoder = Charsets.UTF_8.newDecoder();
System.out.println("Positon of the index " + body.position());
System.out.println("Size of the array : " + body.array().length);
String bodyStr = decoder.decode(body).toString();
System.out.println("THE BODY STRING ---> " bodyStr);
numEvents++;
}
fileReader.close();
}
и возвращает следующий результат:
Положение индекса 0
Размер массива: 127482
СТРОКА ТЕЛА --- ›
Я вижу, что массив не пуст, а просто возвращает пустую строку.
Как я могу продолжить?