collect () или toPandas () в большом DataFrame в pyspark / EMR

У меня есть кластер EMR одной машины «c3.8xlarge», после прочтения нескольких ресурсов я понял, что должен разрешить приличный объем памяти вне кучи, потому что я использую pyspark, поэтому я настроил кластер следующим образом:

Один исполнитель:

  • spark.executor.memory 6g
  • spark.executor.cores 10
  • spark.yarn.executor.memoryOverhead 4096

Драйвер:

  • spark.driver.memory 21g

Когда я cache() DataFrame, он занимает около 3,6 ГБ памяти.

Теперь, когда я вызываю collect() или toPandas() в DataFrame, процесс вылетает.

Я знаю, что привожу в драйвер большой объем данных, но я думаю, что он не такой большой, и я не могу понять причину сбоя.

Когда я звоню collect() или toPandas(), я получаю такую ​​ошибку:

Py4JJavaError: An error occurred while calling o181.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 6.0 failed 4 times, most recent failure: Lost task 5.3 in stage 6.0 (TID 110, ip-10-0-47-207.prod.eu-west-1.hs.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1511879540686_0005_01_000016 on host: ip-10-0-47-207.prod.eu-west-1.hs.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

==== Обновить ====

Как предложил @ user6910411, я попробовал решение, упомянутое здесь, и в этом случае я получаю следующее ошибка:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 4 times, most recent failure: Lost task 7.3 in stage 2.0 (TID 41, ip-10-0-33-57.prod.eu-west-1.hs.internal, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 13.5 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

Есть намёк на то, что здесь происходит?


person Rami    schedule 28.11.2017    source источник


Ответы (3)


TL; DR Я считаю, что вы серьезно недооцениваете требования к памяти.

Даже если предположить, что данные полностью кэшированы, информация о хранилище покажет только часть пиковой памяти, необходимой для передачи данных драйверу.

  • Прежде всего, Spark SQL использует для кэширования сжатое столбчатое хранилище. В зависимости от распределения данных и алгоритма сжатия размер в памяти может быть намного меньше, чем несжатый вывод Pandas, не говоря уже о простом List[Row]. Последний также сохраняет имена столбцов, что еще больше увеличивает использование памяти.
  • Сбор данных является косвенным, данные хранятся как на стороне JVM, так и на стороне Python. Хотя память JVM может быть освобождена после прохождения данных через сокет, пиковое использование памяти должно учитывать и то, и другое.
  • Обычная реализация toPandas сначала собирает Rows, затем локально. Это еще больше увеличивает (возможно, вдвое) использование памяти. К счастью, эта часть уже решена на мастере (Spark 2.3) с более прямым подходом с использованием сериализации Arrow (SPARK-13534 - Реализация сериализатора Apache Arrow для Spark DataFrame для использования в DataFrame.toPandas).

    Для возможного решения, независимого от Apache Arrow, вы можете проверить Более быстрая и более низкая реализация памяти для Pandas в списке разработчиков Apache Spark.

Поскольку данные на самом деле довольно большие, я бы подумал о том, чтобы записать их в Parquet и прочитать их прямо в Python с помощью PyArrow (Чтение и запись формата Apache Parquet), полностью пропуская все промежуточные этапы.

person zero323    schedule 28.11.2017

Как упоминалось выше, при вызове toPandas () все записи DataFrame собираются в программу драйвера и, следовательно, должны выполняться на небольшом подмножестве данных. (https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)

person Dafni Argyro Krystallidou    schedule 21.08.2019

Используя настройку стрелки, вы увидите ускорение

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
person Yash Gupta    schedule 11.07.2020