Как записать объекты Avro в паркет с разделами на Java? Как добавить данные в тот же паркет?

Я использую Confluent KafkaAvroDerserializer для десериализации объектов Avro, отправленных через Kafka. Я хочу записать полученные данные в файл Parquet. Я хочу иметь возможность добавлять данные к тому же паркету и создавать паркет с разделами.

Мне удалось создать паркет с помощью AvroParquetWriter, но я не нашел, как добавить разделы или добавить в тот же файл:

До использования Avro я использовал spark для написания паркета. С помощью spark запись паркета с разделами и использование режима добавления было тривиальным. Должен ли я пытаться создавать Rdds из моих объектов Avro и использовать spark для создания паркета?


person Sharon Gal-Ed    schedule 14.11.2018    source источник
comment
Куда вы хотите записать эти файлы? HDFS? локально? Вы пробовали какой-либо код Spark для этого? И добавление невозможно, скорее вам следует увеличить время окна между опросами от Kafka, чтобы получить больше записей в файле.   -  person OneCricketeer    schedule 14.11.2018
comment
Я хочу записать паркет в HDFS. Я использую KafkaAvroDerserializer для десериализации сообщений Kafka в объекты Avro. Чтобы использовать Spark для написания паркетов, я думаю, мне нужно создать DataDrames из моих объектов Java Avro. Я пытаюсь найти способ сделать это.   -  person Sharon Gal-Ed    schedule 18.11.2018


Ответы (1)


Я хочу записать паркет в HDFS

Лично я бы не стал использовать Spark для этого.

Вместо этого я бы использовал HDFS Kafka Connector. Вот файл конфигурации, с которого можно начать.

name=hdfs-sink
# List of topics to read
topics=test_hdfs

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# increase to be the sum of the partitions for all connected topics
tasks.max=1 

# the folder where core-site.xml and hdfs-site.xml exist
hadoop.conf.dir=/etc/hadoop
# the namenode url, defined as fs.defaultFS in the core-site.xml
hdfs.url=hdfs://hdfs-namenode.example.com:9000

# number of messages per file
flush.size=10 
# The format to write the message values
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

# Setup Avro parser
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry.example.com:8081
value.converter.schemas.enable=true
schema.compatibility=BACKWARD

Если вы хотите, чтобы разделы HDFS основывались на поле, а не на буквальном номере «раздела Kafka», обратитесь к документации по конфигурации на странице FieldPartitioner. Если вам нужна автоматическая интеграция Hive, см. также документацию по этому вопросу.


Допустим, вы действительно хотели использовать Spark, однако вы можете попробовать AbsaOSS/ABRIS для чтения в Avro DataFrame, тогда вы сможете сделать что-то вроде df.write.format("parquet").path("/some/path") (не точный код, потому что я его не пробовал)

person OneCricketeer    schedule 18.11.2018