Spark Dataframe загружает 500k файлов на EMR

Я выполняю задание pyspark в EMR (5.5.1) с Spark 2.1.0, Hadoop 2.7.3, Hive 2.1.1, Sqoop 1.4.6 и Ganglia 3.7.2, которое загружает данные из s3. Есть несколько сегментов, которые содержат входные файлы, поэтому у меня есть функция, которая использует boto, чтобы проходить по ним и отфильтровывать их по некоторому шаблону.

Размер кластера: Master => r4.xlarge, Workers => 3 x r4.4xlarge

Проблема: функция getFilePaths возвращает список путей s3, который напрямую передается методу загрузки фрейма данных Spark.

Использование Dataframe

file_list = getFilePaths() # ['s3://some_bucket/log.json.gz','s3://some_bucket/log2.json.gz']
schema = getSchema()  # for mapping to the json files
df = sparkSession.read.format('json').load(file_list, schema=schema)

Использование RDD

master_rdd = sparkSession.sparkContext.union(
    map(lambda file: sparkSession.sparkContext.textFile(file), file_list)
)
df = sparkSession.createDataFrame(master_rdd, schema=schema)

file_list может быть огромным списком (не более 500 тыс. Файлов) из-за большого количества данных и файлов. Расчет этих путей занимает всего 5-20 минут, но при попытке загрузить их как фрейм данных с помощью Spark пользовательский интерфейс искры остается неактивным в течение часов, то есть вообще ничего не обрабатывает. Период бездействия для обработки 500 КБ файлов составляет более 9 часов, а для 100 КБ файлов - около 1,5 часов.

Просмотр метрик Gangilla показывает, что только драйвер работает / обрабатывает, пока рабочие простаивают. Журналы не создаются до тех пор, пока искровое задание не будет завершено, и я не добился успеха с файлами размером 500 КБ.

введите описание изображения здесь

Я пробовал разъемы s3, s3n, но безуспешно.

Вопрос:

  • Выяснить основную причину этой задержки?
  • Как мне его правильно отладить?

person Abdul Mannan    schedule 13.06.2018    source источник
comment
Были некоторые проблемы с искрой, не читающей файлы параллельно, и пошел по этому маршруту stackoverflow.com/questions/28685874/   -  person David    schedule 13.06.2018


Ответы (2)


В общем, Spark / Hadoop предпочитают иметь большие файлы, которые они могут разделить, вместо огромного количества маленьких файлов. Один из подходов, который вы можете попробовать, - это распараллелить список файлов и затем загрузить данные в вызове карты.

У меня сейчас нет ресурсов, чтобы проверить это, но это должно быть примерно так:

file_list = getFilePaths()
schema = getSchema()  # for mapping to the json files

paths_rdd = sc.parallelize(file_list)

def get_data(path):
    s3 = boto3.resource('s3')

    obj = s3.Object(bucket, path)
    data = obj.get()['Body'].read().decode('utf-8')
    return [json.loads(r) for r in data.split('\n')]

rows_rdd = rdd.flatMap(get_data)
df = spark.createDataFrame(rows_rdd, schema=schema)

Вы также можете сделать это немного более эффективным, используя вместо этого mapPartition, чтобы вам не приходилось каждый раз воссоздавать объект s3.

ИЗМЕНИТЬ 14.06.18:

Что касается обработки данных gzip, вы можете распаковать поток данных gzip с помощью python, как подробно описано в этом ответе: https://stackoverflow.com/a/12572031/1461187. В основном просто передайте obj.get()['Body'].read() в функцию, определенную в этом ответе.

person Ryan Widmaier    schedule 13.06.2018
comment
Эти файлы имеют размер ок. Размер каждого из них составляет 60 МБ, поэтому я не могу их объединить. Я попробую и посмотрю, поможет ли это улучшить производительность. - person Abdul Mannan; 13.06.2018
comment
У вас должна быть возможность комбинировать их, если вы выбрали формат файла, который можно разделять (т.е. не gzip). Вероятно, потребуются некоторые дополнительные шаги по загрузке данных для преобразования ваших данных, но с этим будет намного проще работать, если вы можете просто преобразовать все в Parquet или Avro для начала, прежде чем сохранять на S3 или выполнять какую-либо обширную работу с ним. Тогда вы получите чтение схемы и разделение файлов прямо из коробки без дополнительной работы. - person Ryan Widmaier; 13.06.2018
comment
Да, это мое намерение, но сначала я хочу заставить это работать. Json работает намного медленнее, поэтому я думаю о паркете, прежде чем приступить к их обработке. - person Abdul Mannan; 13.06.2018
comment
Parquet отлично подходит для работы с HDFS, но для S3 убедитесь, что вы посмотрите на оптимизацию, которую вам нужно установить, чтобы избежать лишнего написания. По этой причине я вообще предпочитаю avro с S3. - person Ryan Widmaier; 13.06.2018
comment
Есть ли способ напрямую использовать пути к файлам с фреймом данных? Я бы не хотел работать с rdds - person Abdul Mannan; 13.06.2018
comment
Вы могли бы сделать то же самое, используя UDF, но я не думаю, что это вас слишком сильно спасет, поскольку для загрузки данных все равно потребуется запускать код Python на исполнителях. Единственный известный мне способ - это то, как вы уже пытались использовать список файлов или глобус для функции загрузки DF. - person Ryan Widmaier; 13.06.2018
comment
Вы забыли, что это файлы gzip, поэтому нам нужно разархивировать, прежде чем мы сможем преобразовать в json (что я не уверен, насколько это повлияет на производительность + использование рабочей памяти в зависимости от размера файла) - person Abdul Mannan; 14.06.2018

Возникают две проблемы с производительностью

  1. чтение файлов: файлы gzip нельзя разделить, чтобы их рабочая нагрузка распределялась между рабочими, хотя с файлами 50 МБ мало пользы от разделения вещей
  2. То, как искра соединителей S3 использует имитацию структуры каталогов, является настоящим убийцей производительности для сложных деревьев каталогов.

Проблема №2 - то, что замедляет разбиение: начальный код для решения, что делать, который выполняется перед любым вычислением.

Как бы я попытался с этим справиться? Что ж, здесь нет волшебного переключателя. Но

  • иметь меньше файлов большего размера; как уже отмечалось, Avro хорош, Parquet и ORC позже.
  • используйте очень мелкое дерево каталогов. Все ли эти файлы в одном каталоге? Или в глубоком дереве каталогов? Последнее хуже.

Сначала объедините файлы.

Я бы также избегал каких-либо выводов схемы; похоже, что вы этого не делаете (хорошо!), но для всех, кто читает этот ответ: знайте, что для CSV и, предположительно, JSON вывод схемы означает «прочитать все данные один раз, чтобы выработать схему»

person stevel    schedule 14.06.2018
comment
К сожалению, они находятся в глубокой структуре каталогов. У меня есть планы использовать другой формат сжатия, чем json, но есть некоторые ограничения, поэтому мне нужно хотя бы заставить его работать с текущими настройками. - person Abdul Mannan; 14.06.2018
comment
изменения сжатия ускорят изменения рабочих процессов, но ничего не сделают для процесса разбиения на разделы. Извините - person stevel; 14.06.2018