Метод Spark RDD saveAsTextFile выдает исключение Даже после удаления выходного каталога. org.apache.hadoop.mapred.FileAlreadyExistsException

Я вызываю этот метод в RDD [String] с назначением в аргументах. (Скала)

Даже после удаления каталога перед запуском процесс выдает эту ошибку. Я запускаю этот процесс в кластере EMR с расположением вывода на aws S3. Ниже используется команда:

spark-submit --deploy-mode cluster --class com.hotwire.hda.spark.prd.pricingengine.PRDPricingEngine --conf spark.yarn.submit.waitAppCompletion=true --num-executors 21 --executor-cores 4 --executor-memory 20g --driver-memory 8g --driver-cores 4 s3://bi-aws-users/sbatheja/hotel-shopper-0.0.1-SNAPSHOT-jar-with-dependencies.jar -d 3 -p 100 --search-bucket s3a://hda-prod-business.hotwire.hotel.search --prd-output-path s3a://bi-aws-users/sbatheja/PRD/PriceEngineOutput/

Бревно:

16/07/07 11:27:47 INFO BlockManagerMaster: BlockManagerMaster stopped
16/07/07 11:27:47 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/07/07 11:27:47 INFO SparkContext: Successfully stopped SparkContext
16/07/07 11:27:47 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: **org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3a://bi-aws-users/sbatheja/PRD/PriceEngineOutput already exists)**
16/07/07 11:27:47 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/07/07 11:27:47 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/07/07 11:27:47 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
16/07/07 11:27:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/07/07 11:27:47 INFO ApplicationMaster: Deleting staging directory .sparkStaging/application_1467889642439_0001
16/07/07 11:27:47 INFO ShutdownHookManager: Shutdown hook called
16/07/07 11:27:47 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1467889642439_0001/spark-7f836950-a040-4216-9308-2bb4565c5649

В этом месте создается каталог «_porary», содержащий пустые файлы деталей.


person Saurabh Batheja    schedule 07.07.2016    source источник
comment
Вы уверены, что эта папка не существует до запуска задания? Почему вы используете s3a, а не s3 или s3n?   -  person Avihoo Mamka    schedule 07.07.2016
comment
Да, я удалил каталог раньше всего. в основном причина в том, что s3 поддерживает до 5 ГБ, а s3a не имеет такого ограничения. Пробовал и с s3. та же проблема :(   -  person Saurabh Batheja    schedule 07.07.2016
comment
Может быть, ваша проблема где-то еще в коде, который не работает, и поэтому временные файлы, и у вас есть какой-то механизм повтора, который пытается запустить код снова, а затем терпит неудачу, потому что каталог уже существует с предыдущей попыткой и оставшимися?   -  person Avihoo Mamka    schedule 07.07.2016
comment
Возможно ли, что вы сохраняете несколько раз на один и тот же адрес? Также после сбоя войдите в hdfs и проверьте, что находится в этом каталоге, и выясните, на каком этапе вы находитесь.   -  person GameOfThrows    schedule 07.07.2016
comment
На самом деле я пишу в файл только один раз за весь процесс, и это тоже последний шаг .. может это быть связано со структурой каталогов s3?   -  person Saurabh Batheja    schedule 07.07.2016


Ответы (1)


Вкратце:
Убедитесь, что версии spark-core и scala-library в scala согласованы.


Я столкнулся с той же проблемой. Когда я сохраняю файл в HDFS, возникает исключение: org.apache.hadoop.mapred.FileAlreadyExistsException
Затем я проверил каталог файлов HDFS, там есть пустая временная папка: TARGET_DIR/_temporary/0.

Вы можете отправить задание, открыть подробную конфигурацию: ./spark-submit --verbose. Затем просмотрите полный контекст и журнал, должны быть другие ошибки. Моя работа в состоянии RUNNING, выдается первая ошибка:

17/04/23 11:47:02 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;

Затем задание будет повторено и выполнено повторно. В этот раз при повторной реализации задания он обнаружит, что каталог был создан. А также кидает каталог уже существующий.

После подтверждения, что первая ошибка - это проблемы совместимости версий. Версия Spark - 2.1.0, соответствующая версия spark-core scala - 2.11, а scala-library зависимость версии scala - 2.12.xx.

Когда две версии изменения scala согласованы (обычно это версия scala-library), вы можете решить первую проблему исключения, тогда задание может быть нормальным FINISHED.
pom.xml Пример:

<!-- Spark -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<!-- scala -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.7</version>
</dependency>
person zxholy    schedule 23.04.2017