Резюме статьи

Недавний опыт помощи в реализации Spark ALS привел к двум непонятным ошибкам, связанным с отключениями RPC и переполнением стека. Если это происходит с вами, этот пост может помочь.

Контекст

Недавно я помогал коллеге с внедрением Spark ALS. Они использовали ALS (MLLib)вPySparkв блокноте Databricksна AWS.

Они завершили свой конвейер Spark на основе ALS, и он дал желаемые результаты. Однако…

Изменение

Хотя их код был завершен, они определили, что параметры, используемые на этапе сборки модели ALS, нуждаются в корректировке, поэтому они внесли изменение: они увеличен параметр maxIters функции построения модели. В частности, они увеличили maxIters с 3 до 10 (10 — значение по умолчанию для БАС).

Это все еще работало нормально! :)

Но потом они увеличили maxIters с 10 до 100.

Это не сработало. :(

Они начали сталкиваться со странными ошибками для простых и небольших (или на первый взгляд небольших) запросов (в частности, Действий Spark), таких как .count() и .collect() на маленьких (опять же, на первый взгляд small) DataFrames.

Примечание. .count() приводит к .collect() в фоновом режиме, поэтому .collect() оказался ключевым виновником возникновения этих ошибок.

Ошибки начинались так:

Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 35, localhost): java.lang.StackOverflowError

Во-первых, мы попытались настроить выход из этих ошибок, установив и настроив такие параметры, как:

  • spark.driver.maxResultSize (ограничивает общий размер сериализованных результатов всех разделов для каждого действия Spark (например, сбора) в байтах)
  • spark.driver.memory (устанавливает объем памяти для процесса-драйвера/исполнителя)
  • spark.network.timeout (устанавливает время ожидания по умолчанию для всех сетевых взаимодействий. Это значение используется вместо spark.storage.blockManagerHeartbeatTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout или spark.rpc.lookupTimeout, если они не настроены)

Они практически не повлияли — мы все еще сталкивались с теми же ошибками. Мы заметили ошибку RPC немного меньше, но ошибка StackOverflow по-прежнему последовательно следовала за действиями Spark, такими как .count() или .collect() (опять же, оба из них вызывают внутренний .collect() метод).

После менее чем плодотворных нескольких часов, потраченных на попытки заставить конвейер работать без этих ошибок, мы попытались повторно просмотреть документацию для ALS, чтобы увидеть, не настроили ли мы его неправильно в какой-то момент — и мы особенно задались вопросом количество запрашиваемых интервалов (100) по сравнению с 10 по умолчанию.

В то же время мы заметили, что в пользовательском интерфейсе Spark генерировалось огромное количество этапов и задач, и это, казалось, коррелировало напрямую количество итераций, которые мы запрашивали для ALS.

Отсюда мы сделали несколько тестов:

  • мы пытались изменить только maxIters на 3 (это сработало нормально)
  • мы попытались изменить maxIters на 10 (по умолчанию) (тоже сработало)
  • мы пробовали менять maxIters на 100 (оригинал) (возвращались ошибки/сбои)

Теперь мы к чему-то пришли. Итак, мы начали гуглить ALS Spark Interval StackOverflow RPC errors и нашли несколько обсуждений на эту тему:

Мы обнаружили, что мы должны были использовать две очень важные конфигурации, прежде чем построить и обучить нашу модель ALS:

spark.sparkContext.setCheckpointDir("/some/path/")
ALS.checkpointInterval = some_number

Вот их определения:

setCheckpointDir = Set the directory under which RDDs are going to be checkpointed. The directory must be an HDFS path if running on a cluster. (мы использовали /tmp/checkpoint/ в нашей записной книжке на модулях данных)

интервал контрольной точки = Set period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery (when nodes fail) and StackOverflow exceptions caused by long lineage. It also helps with eliminating temporary shuffle files on disk, which can be important when there are many ALS iterations. If the checkpoint directory is not set in SparkContext, this setting is ignored.

Итак, мы, наконец, сложили два и два вместе: наши maxIters напрямую влияли на количество генерируемых этапов и задач Spark, а это напрямую влияло на размер и происхождение нашего приложения Spark, что приводило к ошибкам StackOverflow и RPC. Отключается!

Огромный объем этапов и задач становился настолько огромным, что приложение (то есть JVM) не справлялось. Поэтому нам нужно было использовать контрольные точки (т. е. разгрузку RDD через заданные интервалы), чтобы уменьшить происхождение (и, следовательно, нагрузку) нашего приложения Spark (опять же, JVM драйвера).

После того, как мы установили spark.sparkContext.setCheckpointDir в /tmp/checkpoint, а ALS.checkpointInterval в 2, мы больше не сталкивались с этими ошибками — даже с maxIters в 100. Это не означает, что вы этого не сделаете — все зависит от количества используемых вами DataFrames и RDD. В конце концов, хотя Spark очень мощный, он не лишен необходимости настройки.