У меня есть искровый потоковый код, который работает в клиентском режиме: он читает данные из кафки, выполняет некоторую обработку и использует spark-cassandra-connector для вставки данных в кассандру.
Когда я использую «--deploy-mode cluster», данные не вставляются, и я получаю следующую ошибку:
Исключение в потоке "streaming-job-executor-53" java.lang.NoClassDefFoundError: com / datastax / spark / connector / ColumnSelector в com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob $$ anonfun $ main $ 2.apply (Wattiopipeline. scala: 94) на com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob $$ anonfun $ main $ 2.apply (WattiopipelineStreamingJob.scala: 88) на org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (ForEachDStream.scala: 50) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream .scala: 50) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream.scala: 50) в org.apache.spark.streaming.dstream .DStream.createRDDWithLocalProperties (DStream.scala: 426) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.применить $ mcV $ sp (ForEachDStream.scala: 49) в org.apach e.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala: 49) в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala: 49) в scala. util. Попробуйте $ .apply (Try.scala: 161) в org.apache.spark.streaming.scheduler.Job.run (Job.scala: 39) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp (JobScheduler.scala: 224) в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler.scala: 224) в org.apache .spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler.scala: 224) в scala.util.DynamicVariable.withValue (DynamicVariable.scala: 57) в org.apache.spark.streaming.scheduler .JobScheduler $ JobHandler.run (JobScheduler.scala: 223) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor ($ Workero). в java.lang.Thread.run (Thread.java:745) Вызвано: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector в java.net.URLClassLoader.findClass (URLClassLoader.java:381) в java .lang.ClassLoader.loadClass (ClassLoader.java:424) в java.lang.ClassLoader.loadClass (ClassLoader.java:357)
Я добавил зависимость для коннектора вот так:
"com.datastax.spark" %% "spark-cassandra-connector"% "1.5.0"% "при условии"
Это код моего приложения:
val measurements = KafkaUtils.createDirectStream[
Array[Byte],
Array[Byte],
DefaultDecoder,
DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
.map {
case (k, v) => {
val decoder = new AvroDecoder[WattioMeasure](null,
WattioMeasure.SCHEMA$)
decoder.fromBytes(v)
}
}
//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
processWattioRaw(measurements))(
(rdd: RDD[
WattioTenantRaw], t: Time) => {
rdd.cache()
//get all the different tenants
val differentTenants = rdd.map(a
=> a.tenant).distinct().collect()
// for each tenant, create keyspace value and flush to cassandra
differentTenants.foreach(tenant => {
val keyspace = tenant + "_readings"
rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
})
rdd.unpersist(true)
}
)
ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()