Как сделать поток DStream Spark в виде таблицы SQL

Цель здесь следующая:

  • считывать данные из Socket с помощью Spark Streaming каждые N секунд

  • регистрировать полученные данные в виде таблицы SQL

  • в качестве справочных данных будет считываться больше данных из HDFS и т. д., они также будут зарегистрированы как таблицы SQL

  • идея состоит в том, чтобы выполнять произвольные SQL-запросы к комбинированным потоковым и справочным данным.

См. Фрагмент кода ниже. Я вижу, что данные записываются на диск «изнутри» цикла forEachRDD, но данные той же зарегистрированной таблицы SQL пусты при записи «вне» цикла forEachRDD.

Пожалуйста, выскажите свое мнение / предложения, чтобы исправить это. Также приветствуется любой другой механизм для достижения вышеуказанной «цели».

case class Record(id:Int, status:String, source:String)

object SqlApp2 {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("SqlApp2").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    // Create the streaming context with a 10 second batch size
    val ssc = new StreamingContext(sc, Seconds(10))

    val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)

    var alldata:DataFrame=sqlContext.emptyDataFrame
    alldata.registerTempTable("alldata")

    lines.foreachRDD((rdd: RDD[String], time: Time) => {
      import sqlContext.implicits._

      // Convert RDD[String] to DataFrame
      val data = rdd.map(w => {
        val words = w.split(" ")
        Record(words(0).toInt, words(1), words(2))}).toDF()

      // Register as table
      data.registerTempTable("alldata")
      data.save("inside/file"+System.currentTimeMillis(), "json", SaveMode.ErrorIfExists)  // this data is written properly
    })

    val dataOutside = sqlContext.sql("select * from alldata")
    dataOutside.save("outside/file"+System.currentTimeMillis(), "json", SaveMode.ErrorIfExists) // this data is empty, how to make the SQL table registered inside the forEachRDD loop visible for rest of application

    ssc.start()
    ssc.awaitTermination()
  }

С уважением

MK


person M.K    schedule 13.12.2015    source источник


Ответы (1)


Насколько я понимаю, вы можете иметь таблицу создания только внутри блока, такого как 'foreachRDD', если вы не перейдете на Маршрут структурированной потоковой передачи. С вашим подходом вы можете использовать скользящее окно для хранения определенного количества данных в вашей таблице. Ниже я привел соответствующий код.

// You could create a window of 1 minute to run your query
val windowedStream = lines.window(Seconds(60))

windowedStream.foreachRDD((rdd: RDD[String], time: Time) => {
  import sqlContext.implicits._
  val data = rdd.map(w => {
    val words = w.split(" ")
    Record(words(0).toInt, words(1), words(2))
  }).toDF()
  data.createOrReplaceTempView("alldata")

  // You can read your other data source and convert it into a DF table
  // and join with the 'alldata' table
  val dataInside = sqlContext.sql("select * from alldata")
  dataInside.show()
})

Надеюсь это поможет.

Обратите внимание, что структурированная потоковая передача находится на начальной стадии и имеет очень ограниченную функциональность.

person Kaptrain    schedule 14.12.2016