Как мне получить Spark на emr-5.2.1 для записи в Dynamodb?

Согласно это статья здесь, когда я создаю кластер aws emr, который будет использовать искру для передачи данных в Dynamodb, мне нужно предварять строку:

spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar

Эта строка встречается во многих источниках, включая самих разработчиков Amazon. Однако, когда я запускаю create-cluster с добавленным флагом --jars, я получаю эту ошибку:

Exception in thread "main" java.io.FileNotFoundException: File file:/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:616)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:829)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:606)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:431)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
...

Ответ на этот вопрос SO < / a>, что библиотека должна быть включена в emr-5.2.1, поэтому я попытался запустить свой код без лишнего флага --jars:

ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/hadoop/dynamodb/DynamoDBItemWritable
java.lang.NoClassDefFoundError: org/apache/hadoop/dynamodb/DynamoDBItemWritable
at CopyS3ToDynamoApp$.main(CopyS3ToDynamo.scala:113)
at CopyS3ToDynamoApp.main(CopyS3ToDynamo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.dynamodb.DynamoDBItemWritable
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Просто для усмешки я попробовал альтернативу, предложенную другим ответом на этот вопрос, добавив --driver-class-path,/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar, к моему шагу, и мне сказали:

Exception in thread "main" java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2702)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93)

Невозможность найти s3a.S3AFileSystem кажется большой проблемой, особенно с учетом того, что у меня есть другие задания, которые прекрасно читают из s3, но, очевидно, чтение из s3 и запись в динамо-машину сложно. Есть идеи, как решить эту проблему?

Обновление: я решил, что s3 не был найден, потому что я переопределил путь к классам и удалил все другие библиотеки, поэтому я обновил путь к классам следующим образом:

class_path = "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:" \
             "/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:" \
             "/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:" \
             "/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:" \
             "/usr/share/aws/emr/ddb/lib/*"

И теперь я получаю такую ​​ошибку:

 diagnostics: User class threw exception: java.lang.NoClassDefFoundError: org/apache/hadoop/dynamodb/DynamoDBItemWritable
 ApplicationMaster host: 10.178.146.133
 ApplicationMaster RPC port: 0
 queue: default
 start time: 1484852731196
 final status: FAILED
 tracking URL: http://ip-10-178-146-68.syseng.tmcs:20888/proxy/application_1484852606881_0001/

Таким образом, похоже, что библиотеки нет в том месте, которое указано в документации AWS. Кто-нибудь заставил это работать?


person mmr    schedule 19.01.2017    source источник


Ответы (2)


Хорошо, на выяснение этого у меня ушло несколько дней, так что я пощажу тех, кто придет рядом, чтобы задать этот вопрос.

Причина того, что эти методы не работают, заключается в том, что путь, указанный специалистами AWS, не существует в кластерах emr 5.2.1 (и, возможно, вообще не существует ни в одном кластере emr 5.0).

Поэтому вместо этого я загрузил версию 4.2 emr- jar-файл Dynamodb-hadoop от Maven.

Поскольку jar не находится в кластере emr, вам нужно будет включить его в свой jar. Если вы используете sbt, вы можете использовать сборку sbt. Если вы не хотите, чтобы такая монолитная банка продолжалась (и вам нужно выяснить разрешение конфликта между версиями 1.7 и 1.8 netbeans), вы также можете просто объедините jar-файлы в процессе сборки. Таким образом, у вас есть одна банка для вашего шага emr, которую вы можете поставить на s3 для простых create-cluster искровых работ по запросу.

person mmr    schedule 20.01.2017

Я использовал https://github.com/audienceproject/spark-dynamodb для подключения искры к Dynamodb на emr.Если вы попытаетесь использовать версию Scala 2.12.X, возникнет много проблем, ниже приведены конфигурации.

Spark 2.3.3, Scala 2.11.12, Spark-Dynamodb_2.11 0.4.4, guva 14.0.1.

Это работает на EMR emr-5.22.0 без каких-либо проблем.

Образец кода.

def main (args: Array[String] ): Unit = {

  val spark = SparkSession.builder
  .appName ("DynamoController1")
  .master ("local[*]")
  .getOrCreate

  val someData = Seq (
  Row (313080991, 1596115553835L, "U", "Insert", "455 E 520th Ave qqqqq", "AsutoshC", "paridaC", 1592408065),
  Row (313080881, 1596115553835L, "I", "Insert", "455 E 520th Ave qqqqq", "AsutoshC", "paridaC", 1592408060),
  Row (313080771, 1596115664774L, "U", "Update", "455 E 520th Ave odisha", "NishantC", "KanungoC", 1592408053)
  )

  val candidate_schema = StructType (Array (StructField ("candidateId", IntegerType, false), StructField ("repoCreateDate", LongType, true),
  StructField ("accessType", StringType, true), StructField ("action", StringType, true), StructField ("address1", StringType, true)
  , StructField ("firstName", StringType, true), StructField ("lastName", StringType, true), StructField ("updateDate", LongType, true) ) )

  var someDF = spark.sqlContext.createDataFrame (
  spark.sqlContext.sparkContext.parallelize (someData),
  StructType (candidate_schema) )

  someDF = someDF.withColumn ("datetype_timestamp", to_timestamp (col ("updateDate") ) )
  someDF.createOrReplaceTempView ("rawData")

  val sourceCount = someDF.select (someDF.schema.head.name).count
  logger.info (s"step [1.0.1] Fetched $sourceCount")
  someDF.show ()

  val compressedDF: DataFrame = spark.sqlContext.sql (s"Select candidateId, repoCreateDate,accessType,action,address1,firstName, lastName,updateDate from rawData ")
  compressedDF.show (20);
  compressedDF.write.dynamodb ("xcloud.Candidate")

  var dynamoDf = spark.read.dynamodb ("xcloud.Candidate")
  var dynamoDf = spark.read.dynamodbAs[candidate_schema] ("xcloud.Candidate")
  dynamoDf.show ();

}

Надеюсь, это кому-то поможет !!!

person Asutosh Parida    schedule 27.08.2020