Как прочитать большой файл массива JSON в PySpark

Проблема

Недавно я столкнулся с проблемой в Azure Data Lake Analytics, когда попытался прочитать файл большого массива JSON UTF-8 и переключился на HDInsight PySpark (v2.x, а не 3) для обработки файла. Размер файла ~ 110 ГБ и ~ 150 м объектов JSON.

HDInsight PySpark, похоже, не поддерживает массив файлов формата JSON для ввода, поэтому я застрял. Кроме того, у меня есть «много» таких файлов с разными схемами, каждый из которых содержит по сотне столбцов, поэтому создание схем для них на данный момент не вариант.

Вопрос

Как использовать готовые функции PySpark 2 в HDInsight, чтобы эти файлы можно было читать как JSON?

Спасибо,

J

Что я пробовал

Я использовал подход, описанный внизу этой страницы: из Databricks, который предоставил приведенный ниже фрагмент кода:

import json

df = sc.wholeTextFiles('/tmp/*.json').flatMap(lambda x: json.loads(x[1])).toDF()
display(df)

Я попробовал описанное выше, не понимая, как работает "wholeTextFiles", и, конечно же, столкнулся с ошибками OutOfMemory, которые быстро убили моих исполнителей.

Я попытался загрузить в RDD и другие открытые методы, но PySpark, похоже, поддерживает только формат файла JSONLines JSON, а у меня есть массив объектов JSON из-за требований ADLA к этому формату файла.

Я пробовал читать в виде текстового файла, удаляя символы массива, разделяя границы объекта JSON и преобразовывая в JSON, как указано выше, но это продолжало выдавать ошибки о невозможности преобразовать Unicode и / или str (ings).

Я нашел способ справиться с вышеизложенным и преобразовал его в фрейм данных, содержащий один столбец со строками строк, которые были объектами JSON. Однако я не нашел способа вывести только строки JSON из строк фрейма данных в выходной файл отдельно. Всегда выходил как

{'dfColumnName':'{...json_string_as_value}'}

Я также попробовал функцию карты, которая принимает указанные выше строки, разбирается как JSON, извлекает значения (мне нужен JSON), а затем анализирует значения как JSON. Казалось, это сработало, но когда я пытался сохранить, RDD имел тип PipelineRDD и не имел метода saveAsTextFile (). Затем я попробовал метод toJSON, но продолжал получать ошибки типа «не нашел действительного объекта JSON», которого я, по общему признанию, не понимал, и, конечно же, другие ошибки преобразования.


person jatal    schedule 10.02.2018    source источник


Ответы (1)


Наконец-то я нашел путь вперед. Я узнал, что могу читать json прямо из RDD, включая PipelineRDD. Я нашел способ удалить заголовок порядка байтов в Юникоде, заключить массив в квадратные скобки, разделить объекты JSON на основе удачного разделителя и получить распределенный набор данных для более эффективной обработки. Фрейм данных вывода теперь имеет столбцы, названные в честь элементов JSON, выводят схему и динамически адаптируются для других форматов файлов.

Вот код - надеюсь, поможет !:

#...Spark considers arrays of Json objects to be an invalid format
#    and unicode files are prefixed with a byteorder marker
#
thanksMoiraRDD = sc.textFile( '/a/valid/file/path', partitions ).map(
    lambda x: x.encode('utf-8','ignore').strip(u",\r\n[]\ufeff") 
)

df = sqlContext.read.json(thanksMoiraRDD)
person jatal    schedule 10.02.2018