pyspark как загрузить сжатый файл snappy

Я сжал файл с помощью python-snappy и поместил его в свой магазин hdfs. Сейчас я пытаюсь прочитать это так, но получаю следующую трассировку. Я не могу найти пример того, как прочитать файл, чтобы я мог его обработать. Я могу нормально читать текстовый файл (несжатый). Должен ли я использовать sc.sequenceFile? Спасибо!

I first compressed the file and pushed it to hdfs

python-snappy -m snappy -c gene_regions.vcf gene_regions.vcf.snappy
hdfs dfs -put gene_regions.vcf.snappy /

I then added the following to spark-env.sh
export SPARK_EXECUTOR_MEMORY=16G                                                
export HADOOP_HOME=/usr/local/hadoop                                            

export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native             
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native                 
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native           
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.1.1.8-SNAPSHOT.jar

I then launch my spark master and slave and finally my ipython notebook where I am executing the code below.

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

ValueError Traceback (последний последний вызов) in () ----> 1 a_file.first()

/home/user/Software/spark-1.3.0-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self) 1244 if rs: 1245 return rs[0] -> 1246 поднять ValueError("RDD is пустой") 1247 1248 def isEmpty(self):

ValueError: RDD пуст

Working code (uncompressed) text file
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf")
a_file.first()

вывод: u'##fileformat=VCFv4.1'


person Levi Pierce    schedule 25.04.2015    source источник
comment
Пожалуйста, уточните свой вопрос. Кроме того, предоставьте более актуальный код (например, как сохранить файл)   -  person Mark Segal    schedule 26.04.2015


Ответы (4)


Проблема здесь в том, что python-snappy не совместим с кодеком Hadoop snappy, который Spark будет использовать для чтения данных, когда увидит суффикс «.snappy». Они основаны на одном и том же базовом алгоритме, но они несовместимы в том смысле, что вы можете сжимать с помощью одного и распаковывать с помощью другого.

Вы можете выполнить эту работу, либо записав свои данные в первую очередь для мгновенного использования Spark или Hadoop. Или заставив Spark считать ваши данные в виде двоичных BLOB-объектов, а затем вручную вызвать распаковку python-snappy самостоятельно (см. бинарные файлы здесь http://spark.apache.org/docs/latest/api/python/pyspark.html). Подход с бинарным BLOB-объектом немного более хрупок, потому что он должен помещать весь файл в память для каждого входного файла. Но если ваши данные достаточно малы, это сработает.

person Patrick Wendell    schedule 25.04.2015
comment
Спасибо, Патрик, это имеет большой смысл. Я читал еще немного о быстром кодеке Hadoop, который, похоже, используется для промежуточных файлов, созданных из преобразователя, до того, как все будет сведено обратно. Есть ли утилита командной строки, которую я могу использовать для сжатия текстовых файлов с помощью кодека hadoop snappy перед их отправкой в ​​хранилище hdfs? В основном у меня есть около 10 000 текстовых файлов по 50 миллионов строк. Похоже, это может сработать...github.com/kubo/snzip - person Levi Pierce; 27.04.2015
comment
Теперь это устарело, python-snappy поддерживает hadoop-snappy, хотя это не очень ясно. - person Jeroen; 18.03.2021

Принятый ответ устарел. Можно использовать python-snappy для сжатия hadoop-snappy, но документация практически отсутствует. Пример:

import snappy
with open('test.json.snappy', 'wb') as out_file:
    data=json.dumps({'test':'somevalue','test2':'somevalue2'}).encode('utf-8')
    compressor = snappy.hadoop_snappy.StreamCompressor()
    compressed = compressor.compress(data)
    out_file.write(compressed)

Вы также можете использовать командную строку, где опция немного более прямолинейна, используя флаг -t hadoop_snappy. Пример:

echo {'test':'somevalue','test2':'somevalue2'} | python -m snappy -t hadoop_snappy -c - test.json.snappy

person Jeroen    schedule 18.03.2021

Хорошо, я нашел решение!

Соберите это... https://github.com/liancheng/snappy-utils В Ubuntu 14.10 Мне пришлось установить gcc-4.4, чтобы он прокомментировал мою ошибку, которую я видел здесь https://code.google.com/p/hadoop-snappy/issues/detail?id=9

Теперь я могу сжимать текстовые файлы, используя snappy в командной строке, например так

snappy -c gene_regions.vcf -o gene_regions.vcf.snappy

закинуть в hdfs

hdfs dfs -put gene_regions.vcf.snappy

а затем загрузите его в pyspark!

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

Вуаля! Заголовок vcf...

u'##fileformat=VCFv4.1'
person Levi Pierce    schedule 27.04.2015

Не знаю точно, какой snappy кодек у моих файлов, но spark.read.text у меня работал без происшествий.

person ijoseph    schedule 02.06.2020