Я успешно интегрировал код для извлечения сообщений из концентратора событий и их обработки с помощью искровой / искровой потоковой передачи. Теперь я перехожу к управлению состоянием по мере прохождения сообщений. Это код, который я использую, который по большей части является адаптацией https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html
По сути, это работает с фиктивным источником, он работает с одним потоком в одном разделе, но не работает для объединенного оконного потока .. Хотя я мог создать несколько экземпляров потока, по одному для каждого раздела, это как бы побеждает точку объединение и окно .. + мои попытки заставить его работать таким образом не увенчались успехом. Я как бы застрял в поисках вдохновения, куда мне теперь идти ... если у кого-то есть какие-то идеи, которые были бы грандиозными ...
val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10))
streamingContext.checkpoint(inputOptions.checkpointDir)
//derive the stream and window
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10))
val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L)))
val stateSpec = StateSpec.function(trackStateFunc _)
.initialState(initialRDD)
.numPartitions(2)
.timeout(Seconds(60))
val eventStream = eventHubsWindowedStream
.map(messageStr => {
//parse teh event
var event = gson.fromJson(new String(messageStr), classOf[Event])
//return a tuble of key/value pair
(event.product_id.toString, 1)
})
val eventStateStream = eventStream.mapWithState(stateSpec)
val stateSnapshotStream = eventStateStream.stateSnapshots()
stateSnapshotStream.print()
stateSnapshotStream.foreachRDD { rdd =>
import sparkSession.implicits._
rdd.toDF("word", "count").registerTempTable("batch_word_count")
}
streamingContext.remember(Minutes(1))
streamingContext
val streams = (0 until partitionCount).map { i => createStream(streamingContext, eventhubsParams, i.toString, storageLevel) } streamingContext.union(streams)
, который возвращаетDStream[Array[Byte]]
, аeventHubsStream.window(Seconds(10))
разрешаетDStream[Array[Byte]]
, я могу сопоставить это сDStream[(String, Int)]
, который я могу вызвать mapWithState и т. Д. насколько я могу судить, я правильно строю код - person David Crossland   schedule 15.02.2017