Потоковая передача Spark не вставляет данные в Cassandra

У меня есть искровый потоковый код, который работает в клиентском режиме: он читает данные из кафки, выполняет некоторую обработку и использует spark-cassandra-connector для вставки данных в кассандру.

Когда я использую «--deploy-mode cluster», данные не вставляются, и я получаю следующую ошибку:

Исключение в потоке "streaming-job-executor-53" java.lang.NoClassDefFoundError: com / datastax / spark / connector / ColumnSelector в com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob $$ anonfun $ main $ 2.apply (Wattiopipeline. scala: 94) на com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob $$ anonfun $ main $ 2.apply (WattiopipelineStreamingJob.scala: 88) на org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (ForEachDStream.scala: 50) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream .scala: 50) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream.scala: 50) в org.apache.spark.streaming.dstream .DStream.createRDDWithLocalProperties (DStream.scala: 426) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.применить $ mcV $ sp (ForEachDStream.scala: 49) в org.apach e.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala: 49) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala: 49) в scala. util. Попробуйте $ .apply (Try.scala: 161) в org.apache.spark.streaming.scheduler.Job.run (Job.scala: 39) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp (JobScheduler.scala: 224) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler.scala: 224) в org.apache .spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler.scala: 224) в scala.util.DynamicVariable.withValue (DynamicVariable.scala: 57) в org.apache.spark.streaming.scheduler .JobScheduler $ JobHandler.run (JobScheduler.scala: 223) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor ($ Workero). в java.lang.Thread.run (Thread.java:745) Вызвано: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector в java.net.URLClassLoader.findClass (URLClassLoader.java:381) в java .lang.ClassLoader.loadClass (ClassLoader.java:424) в java.lang.ClassLoader.loadClass (ClassLoader.java:357)

Я добавил зависимость для коннектора вот так:

"com.datastax.spark" %% "spark-cassandra-connector"% "1.5.0"% "при условии"

Это код моего приложения:

    val measurements = KafkaUtils.createDirectStream[
  Array[Byte],
  Array[Byte],
  DefaultDecoder,
  DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
  .map {
    case (k, v) => {
      val decoder = new AvroDecoder[WattioMeasure](null,
        WattioMeasure.SCHEMA$)
      decoder.fromBytes(v)
    }
  }

//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
  processWattioRaw(measurements))(
  (rdd: RDD[
    WattioTenantRaw], t: Time) => {
    rdd.cache()
    //get all the different tenants
    val differentTenants = rdd.map(a
    => a.tenant).distinct().collect()
    // for each tenant, create keyspace value and flush to cassandra
    differentTenants.foreach(tenant => {
      val keyspace = tenant + "_readings"
      rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
    })
    rdd.unpersist(true)
  }
)

ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()

person Srdjan Nikitovic    schedule 30.03.2016    source источник
comment
как вы указываете зависимость коннектора во время выполнения? Какая у вас полная команда запуска?   -  person RussS    schedule 30.03.2016


Ответы (3)


Вы должны убедиться, что ваш JAR доступен для рабочих. Мастер искры откроет файловый сервер после начала выполнения задания.

Вам необходимо указать путь к вашей uber-банке либо с помощью SparkContext.setJars, либо с помощью флага --jars, переданного spark-submit.

Из документации

При использовании spark-submit файл jar приложения вместе с любыми jar-файлами, включенными в параметр --jars, будет автоматически перенесен в кластер. Spark использует следующую схему URL-адресов, чтобы разрешить различные стратегии распространения jar-файлов.

person Yuval Itzchakov    schedule 30.03.2016

На самом деле я решил это, удалив «предоставлено» в списке зависимостей, так что sbt упаковал spark-cassandra-connector в мою сборочную банку.

Интересно то, что в моем скрипте запуска, даже когда я пытался использовать

spark-submit --repositories "расположение моего репозитория артефактов" --packages "spark-cassandra-connector"

or

spark-submit --jars "spark-cassandra-connector.jar"

оба они потерпели неудачу!

person Srdjan Nikitovic    schedule 31.03.2016

Область действия предоставлена ​​ означает, что вы ожидаете, что JDK или контейнер предоставит зависимость во время выполнения, и эта конкретная jar-файл зависимости не будет частью вашего последнего приложения War / jar, которое вы создаете, следовательно, эту ошибку.

person Suresh    schedule 21.07.2016