чтение паркета в Google DataFlow с помощью AvroParquetInputFormat

Попытка прочитать простой файл Parquet в моем конвейере Google DataFlow

используя следующий код

Read.Bounded<KV<Void, GenericData>> results = HadoopFileSource.readFrom("/home/avi/tmp/db_demo/simple.parquet", AvroParquetInputFormat.class, Void.class, GenericData.class);

всегда запускать следующее исключение при запуске конвейера

IllegalStateException: не удается найти кодировщик для класса org.apache.avro.generic.GenericData

похоже, что этот метод внутри HadoopFileSource не может обрабатывать этот тип класса, как для кодера

  private <T> Coder<T> getDefaultCoder(Class<T> c) {
if (Writable.class.isAssignableFrom(c)) {
  Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
  return (Coder<T>) WritableCoder.of(writableClass);
} else if (Void.class.equals(c)) {
  return (Coder<T>) VoidCoder.of();
}
// TODO: how to use registered coders here?
throw new IllegalStateException("Cannot find coder for " + c);

}

любая помощь будет оценена

Ави


person Avi P    schedule 28.01.2017    source источник
comment
Привет, ты строишь с maven? Можно ли предоставить дамп вашего дерева зависимостей? Вот несколько инструкций: stackoverflow.com/questions/7953888/   -  person Alex Amato    schedule 30.01.2017


Ответы (1)


Это проблема с дизайном HadoopFileSource. Я бы посоветовал перейти на apache-beam или (scio), который является «версией» apache (и «будущим ") потока данных sdk. Как только вы окажетесь на балке, вы сможете:

Это будет scala (но вы можете легко перевести на java):

HDFSFileSource.from(
  input,
  classOf[AvroParquetInputFormat[AvroSchemaClass]],
  AvroCoder.of(classOf[AvroSchemaClass]),
  new SerializableFunction[KV[Void, AvroSchemaClass], AvroSchemaClass]() {
    override def apply(e: KV[Void, AvroSchemaClass]): AvroSchemaClass =
      CoderUtils.clone(AvroCoder.of(classOf[AvroSchemaClass]), e.getValue)
  }
)

которая является альтернативной версией from, которая принимает coder.

person rav    schedule 13.02.2017