Цель здесь следующая:
считывать данные из Socket с помощью Spark Streaming каждые N секунд
регистрировать полученные данные в виде таблицы SQL
в качестве справочных данных будет считываться больше данных из HDFS и т. д., они также будут зарегистрированы как таблицы SQL
идея состоит в том, чтобы выполнять произвольные SQL-запросы к комбинированным потоковым и справочным данным.
См. Фрагмент кода ниже. Я вижу, что данные записываются на диск «изнутри» цикла forEachRDD, но данные той же зарегистрированной таблицы SQL пусты при записи «вне» цикла forEachRDD.
Пожалуйста, выскажите свое мнение / предложения, чтобы исправить это. Также приветствуется любой другой механизм для достижения вышеуказанной «цели».
case class Record(id:Int, status:String, source:String)
object SqlApp2 {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SqlApp2").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// Create the streaming context with a 10 second batch size
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
var alldata:DataFrame=sqlContext.emptyDataFrame
alldata.registerTempTable("alldata")
lines.foreachRDD((rdd: RDD[String], time: Time) => {
import sqlContext.implicits._
// Convert RDD[String] to DataFrame
val data = rdd.map(w => {
val words = w.split(" ")
Record(words(0).toInt, words(1), words(2))}).toDF()
// Register as table
data.registerTempTable("alldata")
data.save("inside/file"+System.currentTimeMillis(), "json", SaveMode.ErrorIfExists) // this data is written properly
})
val dataOutside = sqlContext.sql("select * from alldata")
dataOutside.save("outside/file"+System.currentTimeMillis(), "json", SaveMode.ErrorIfExists) // this data is empty, how to make the SQL table registered inside the forEachRDD loop visible for rest of application
ssc.start()
ssc.awaitTermination()
}
С уважением
MK