Я выполняю задание pyspark в EMR (5.5.1) с Spark 2.1.0, Hadoop 2.7.3, Hive 2.1.1, Sqoop 1.4.6 и Ganglia 3.7.2, которое загружает данные из s3. Есть несколько сегментов, которые содержат входные файлы, поэтому у меня есть функция, которая использует boto, чтобы проходить по ним и отфильтровывать их по некоторому шаблону.
Размер кластера: Master => r4.xlarge, Workers => 3 x r4.4xlarge
Проблема: функция getFilePaths
возвращает список путей s3, который напрямую передается методу загрузки фрейма данных Spark.
Использование Dataframe
file_list = getFilePaths() # ['s3://some_bucket/log.json.gz','s3://some_bucket/log2.json.gz']
schema = getSchema() # for mapping to the json files
df = sparkSession.read.format('json').load(file_list, schema=schema)
Использование RDD
master_rdd = sparkSession.sparkContext.union(
map(lambda file: sparkSession.sparkContext.textFile(file), file_list)
)
df = sparkSession.createDataFrame(master_rdd, schema=schema)
file_list
может быть огромным списком (не более 500 тыс. Файлов) из-за большого количества данных и файлов. Расчет этих путей занимает всего 5-20 минут, но при попытке загрузить их как фрейм данных с помощью Spark пользовательский интерфейс искры остается неактивным в течение часов, то есть вообще ничего не обрабатывает. Период бездействия для обработки 500 КБ файлов составляет более 9 часов, а для 100 КБ файлов - около 1,5 часов.
Просмотр метрик Gangilla показывает, что только драйвер работает / обрабатывает, пока рабочие простаивают. Журналы не создаются до тех пор, пока искровое задание не будет завершено, и я не добился успеха с файлами размером 500 КБ.
Я пробовал разъемы s3, s3n, но безуспешно.
Вопрос:
- Выяснить основную причину этой задержки?
- Как мне его правильно отладить?