Spark 1.6 при записи EMR в S3, поскольку Parquet зависает и дает сбой

Я создаю приложение uber jar spark, которое я отправляю в кластер EMR 4.3, я инициализирую 4 экземпляра r3.xlarge, один в качестве главного, а три других в качестве ядер.

У меня были предустановленные с консоли hadoop 2.7.1, ganglia 3.7.2 spark 1.6 и hive 1.0.0.

Я выполняю следующую команду:

spark-submit \
--deploy-mode cluster \
--executor-memory 4g \
--executor-cores 2 \
--num-executors 4 
--driver-memory 4g 
--driver-cores 2  
--conf "spark.driver.maxResultSize=2g" 
--conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter"  
--conf "spark.shuffle.memoryFraction=0.2" 
--class com.jackar.spark.Main spark-app.jar args

Я понимаю, что не полностью использую кластер, но на данный момент я не совсем пытаюсь настроить (или, может быть, это то, что мне следует делать?). Моя основная работа делает что-то вроде:

1) Прочтите паркетные файлы из s3, которые представляют два набора данных, запустите registerTempTable для Dataframes, затем запустите cacheTable для каждого. У них примерно по 300 мб памяти. (примечание: я пробовал использовать протокол EMR s3: //, а также s3a: //)

2) Используйте Spark sql для запуска агрегатов (то есть суммирования и группировки).

3) Запишите результаты в s3 как паркетные файлы.

Когда я смотрю на пользовательский интерфейс Spark, задания выполняются нормально, и они занимают столько времени, сколько я ожидал. Проблема в том, что после завершения задания write-agg-to-parquet-in-s3 (вкладка Job) есть период времени, когда никакие другие задания не попадают в очередь.

Если я затем перейду на вкладку SQL в пользовательском интерфейсе Spark, я заметил, что есть «выполняющийся запрос» для того же задания, которое, как указано на вкладке «Задания», завершено. Когда я щелкаю и смотрю на DAG для этого запроса, я замечаю, что DAG, кажется, уже оценен.

Визуализация DAG

Однако этот запрос занимает минуты и иногда приводит к перезапуску всего приложения Spark и, в конечном итоге, сбою ...

Spark UI: вкладка SQL

Я начал исследовать, смогу ли я выяснить проблему, потому что в моей пробной версии Databricks это задание выполняется невероятно быстро, а DAG идентичен таковому в EMR (как и ожидалось). Но я не могу заставить себя оправдать использование Databricks, когда я понятия не имею, почему я не вижу аналогичной производительности в EMR.

Может дело в моих параметрах JVM? Например, сборщик мусора? Пора проверить логи исполнителя.

2016-02-23T18:25:48.598+0000: [GC2016-02-23T18:25:48.598+0000: [ParNew: 299156K->30449K(306688K), 0.0255600 secs] 1586767K->1329022K(4160256K), 0.0256610 secs] [Times: user=0.05 sys=0.00, real=0.03 secs] 
2016-02-23T18:25:50.422+0000: [GC2016-02-23T18:25:50.422+0000: [ParNew: 303089K->32739K(306688K), 0.0263780 secs] 1601662K->1342494K(4160256K), 0.0264830 secs] [Times: user=0.07 sys=0.01, real=0.02 secs] 
2016-02-23T18:25:52.223+0000: [GC2016-02-23T18:25:52.223+0000: [ParNew: 305379K->29373K(306688K), 0.0297360 secs] 1615134K->1348874K(4160256K), 0.0298410 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 
2016-02-23T18:25:54.247+0000: [GC2016-02-23T18:25:54.247+0000: [ParNew: 302013K->28521K(306688K), 0.0220650 secs] 1621514K->1358123K(4160256K), 0.0221690 secs] [Times: user=0.06 sys=0.01, real=0.02 secs] 
2016-02-23T18:25:57.994+0000: [GC2016-02-23T18:25:57.994+0000: [ParNew: 301161K->23609K(306688K), 0.0278800 secs] 1630763K->1364319K(4160256K), 0.0279460 secs] [Times: user=0.07 sys=0.01, real=0.03 secs]

Хорошо. Это не выглядит хорошо. Parnew останавливает мир, и это происходит каждые пару секунд.

Следующий шаг, изучение пользовательского интерфейса Spark на Databricks, чтобы увидеть, отличается ли конфигурация gc от EMR. Я нашел кое-что интересное. Databricks устанавливает для spark.executor.extraJavaOptions значение:

-XX:ReservedCodeCacheSize=256m 
-XX:+UseCodeCacheFlushing 
-javaagent:/databricks/DatabricksAgent.jar 
-XX:+PrintFlagsFinal 
-XX:+PrintGCDateStamps 
-verbose:gc 
-XX:+PrintGCDetails 
-XX:+HeapDumpOnOutOfMemoryError 
-Ddatabricks.serviceName=spark-executor-1

Привет, я не эксперт по gc, и я добавил «научиться настраивать gc» в свой список задач, но я вижу здесь нечто большее, чем просто параметры gc для исполнителей. Что делает DatabricksAgent.jar - это помогает? Я не уверен, поэтому я заставляю свою искру использовать параметры java для исполнителей за вычетом конкретных вещей для блоков данных:

--conf spark.executor.extraJavaOptions="-XX:ReservedCodeCacheSize=256m -XX:+UseCodeCacheFlushing -XX:+PrintFlagsFinal -XX:+PrintGCDateStamps -verbose:gc -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError"

Это не меняет поведения «выполняющегося запроса» - это все равно занимает вечность - но я получаю PSYoungGen вместо Parnew (хотя частота по-прежнему составляет каждые пару секунд):

2016-02-23T19:40:58.645+0000: [GC [PSYoungGen: 515040K->12789K(996352K)] 1695803K->1193777K(3792896K), 0.0203380 secs] [Times: user=0.03 sys=0.01, real=0.02 secs] 
2016-02-23T19:57:50.463+0000: [GC [PSYoungGen: 588789K->13391K(977920K)] 1769777K->1196033K(3774464K), 0.0237240 secs] [Times: user=0.04 sys=0.00, real=0.02 secs] 

Если вы дочитали до этого места, я хваляю вас. Я знаю, сколько у этого поста.

Еще один симптом, который я обнаружил, заключается в том, что пока выполняется запрос, stderr и stdout находятся в состоянии покоя, и никакие новые строки журнала не добавляются ни в один исполнитель (включая драйвер).

i.e.

16/02/23 19:41:23 INFO ContextCleaner: Cleaned shuffle 5
16/02/23 19:57:32 INFO DynamicPartitionWriterContainer: Job job_201602231940_0000 committed.

Тот же интервал ~ 17 минут учитывается в пользовательском интерфейсе Spark как выполняющийся запрос ... Есть идеи, что происходит?

В конце концов, это задание имеет тенденцию перезапускаться после того, как несколько agg записываются в S3 (скажем, 10% из них), а затем, в конечном итоге, приложение Spark выходит из строя.

Я не уверен, связана ли проблема с тем, что EMR работает в YARN, а Databricks работает в автономном кластере, или это совершенно не связано.

Ошибка, которую я получаю после просмотра журналов пряжи, выглядит следующим образом:

java.io.FileNotFoundException: No such file or directory: s3a://bucket_file_stuff/_temporary/0/task_201602232109_0020_m_000000/

Любой совет очень ценится. Я буду добавлять заметки по ходу. Спасибо!


person jackar    schedule 23.02.2016    source источник
comment
Вы видели этот пост stackoverflow.com/questions/35002184/?   -  person Glennie Helles Sindholt    schedule 24.02.2016
comment
Как отмечает @GlennieHellesSindholt, это появляется в ряде других вопросов. См. stackoverflow.com/q/35236375/4394783. По сути, spark.hadoop.spark.sql.parquet.output.committer.class необходимо установить в Hadoop conf, а не в Spark conf; это не действует в вашем примере. Также используйте s3: // или s3n: //.   -  person ChristopherB    schedule 26.02.2016
comment
фактически согласно Amazon вы всегда должны использовать s3: // по EMR   -  person Tal Joffe    schedule 23.08.2016
comment
небольшая поправка поверх комментария @ChristopherB - при настройке в hadoopConfiguration контекста искры достаточно установить spark.sql.parquet.output.committer.class (без префикса spark.hadoop)   -  person Yonatan Wilkof    schedule 29.08.2017
comment
@jackar ты это исправил, если да, то как? не могли бы вы предложить, как справиться с этим stackoverflow.com/questions/62036791/   -  person BdEngineer    schedule 27.05.2020