Сжатые файлы последовательностей Hadoop Python

У меня есть клиент, который отправляет мне сжатые файлы последовательности Snappy для анализа. В конечном итоге я хочу поместить эти данные в файл pandas df. Формат выглядит следующим образом

>>> body_read

b'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\xff\xff\xff\xff\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\x8e\x05^N\x00\x00\x05^\x00\x00\x00F\xde\n\x00\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00r\x01\x00\x04\x00\x00\x00\x00\x8e\x08\x92\x00\x00\x10\x1a\x00\x00\x08\x8a\x9a h\x8e\x02\xd6\x8e\x02\xc6\x8e\x02T\x8e\x02\xd4\x8e\x02\xdb\x8e\x02\xd8\x8e\x02\xdf\x8e\x02\xd9\x8e\x02\xd3\x05\x0c0\xd9\x8e\x02\xcc\x8e\x02\xfc\x8e\x02\xe8\x8e\x02\xd0\x05!\x00\xdb\x05\x06\x0c\xd1\x8e\x02\xd7\x05\'\x04\xde\x8e\x01\x03\x18\xce\x8e\x02\xe7\x8e\x02\xd2\x05<\x00\xd4\x05\x1b\x04\xdc\x8e

Я думаю, что мне нужно сделать, это сначала распаковать файл с помощью python-snappy, а затем прочитать файлы последовательности. Я не уверен, какой лучший метод для чтения файлов последовательности Hadoop в python. Я также получаю ошибку при попытке распаковать этот файл

>>> body_decomp = snappy.uncompress(body_read)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ec2-user/anaconda3/lib/python3.5/site-packages/snappy/snappy.py", line 91, in uncompress
    return _uncompress(data)
snappy.UncompressError: Error while decompressing: invalid input

Что мне нужно сделать, чтобы прочитать эти файлы?


person user3456269    schedule 06.02.2018    source источник
comment
Попробуйте Pyspark...?   -  person OneCricketeer    schedule 06.02.2018
comment
Могу ли я использовать Pyspark, даже если я не использую Apache Spark? Я не хочу помещать эти файлы в кластер Hadoop, все, что я хочу сделать, это преобразовать их в pandas df и, возможно, загрузить в MySQL.   -  person user3456269    schedule 06.02.2018
comment
Вам не нужен кластер Hadoop. Spark не имеет ничего общего с HDFS или YARN. Он просто включает библиотеки для чтения файлов последовательности (и преобразования их в фрейм данных Pandas, а также для вставки в базу данных). PySpark требует библиотеки Apache Spark, да. См. stackoverflow.com/a/29498104/2308683.   -  person OneCricketeer    schedule 07.02.2018
comment
Спасибо! Я смог следовать этому руководству и заставить работать pyspark. Одно странное несоответствие, которое до сих пор ставит меня в тупик, заключается в том, что в моей искровой оболочке мой файл автоматически распаковывается:   -  person user3456269    schedule 07.02.2018


Ответы (1)


Благодаря полезным комментариям @cricket_007 и еще некоторым копаниям я смог решить эту проблему. PySpark выполнит необходимые мне задачи и сможет считывать файлы последовательности Hadoop непосредственно из местоположений S3, и это здорово. Сложная часть заключалась в настройке PySpark, и я нашел это руководство действительно полезным после того, как загрузил Apache Spark — https://markobigdata.com/2017/04/23/manipulating-files-from-s3-with-apache-spark/.

Одно странное несоответствие, которое у меня есть, заключается в том, что моя искровая оболочка автоматически распаковывает файл:

scala> val fRDD = sc.textFile("s3a://bucket/file_path")
fRDD: org.apache.spark.rdd.RDD[String] = s3a://bucket/file_path MapPartitionsRDD[5] at textFile at <console>:24

scala> fRDD.first()
res4: String = SEQ?!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable??)org.apache.hadoop.io.compress.SnappyCodec???? �Z�f�uAf���- 2D���� �Z�f�uAf���- 2D�?^N???^???F�

но PySpark не делает:

>>> from pyspark import SparkContext, SparkConf
>>> sc = SparkContext()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/02/06 23:00:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

>>> fRDD = sc.textFile("s3a://bucket/file_path")
>>> fRDD.first()
'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b�Z�f�uAf���-\x1d2D����\x0b�Z�f�uAf���-\x1d2D�\x05^N\x00\x00\x05^\x00\x00\x00F�'

Любые идеи, как мне заставить PySpark сделать это?

EDIT: еще раз спасибо cricket_007, вместо этого я начал использовать .sequenceFile(). Это изначально давало мне ошибку

    >>> textFile = sc.sequenceFile("s3a://bucket/file_path")
18/02/07 18:13:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.

Я смог решить эту проблему, следуя этому руководству — https://community.hortonworks.com/questions/18903/this-version-of-libhadoop-was-built-without-snappy.html. Теперь я могу прочитать файл последовательности и декомпилировать сообщение protobuf.

>>> seqs = sc.sequenceFile("s3a://bucket/file_path").values()
>>> feed = protobuf_message_pb2.feed()
>>> row = bytes(seqs.first())
>>> feed.ParseFromString(row)
>>> feed.user_id_64
3909139888943208259

Это именно то, что мне нужно. Что я хочу сделать сейчас, так это найти эффективный способ декомпилировать весь файл sequenceFile и превратить его в DataFrame, а не делать это по одной записи за раз, как я сделал выше.

person user3456269    schedule 06.02.2018
comment
Я думаю, что вывод — это просто разница в кодировании Scala и Python REPL. Сама строка не распаковывается - person OneCricketeer; 07.02.2018
comment
Также не используйте textFile. Существует специальный метод для SequenceFiles spark.apache .org/docs/latest/api/python/ - person OneCricketeer; 07.02.2018
comment
Спасибо, cricket_007, это была хорошая идея. Мне пришлось следовать этому решению, чтобы заставить быстро работать распаковку "nofollow noreferrer" title="эта версия libhadoop была собрана без snappy.html">community.hortonworks.com/questions/18903/, но теперь у меня все хорошо. Моя последняя задача — найти эффективный способ декомпилировать сообщения protobuf и превратить результаты в фрейм данных. - person user3456269; 07.02.2018