Потоковая передача Spark и концентраторы событий Azure mapWithState

Я успешно интегрировал код для извлечения сообщений из концентратора событий и их обработки с помощью искровой / искровой потоковой передачи. Теперь я перехожу к управлению состоянием по мере прохождения сообщений. Это код, который я использую, который по большей части является адаптацией https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

По сути, это работает с фиктивным источником, он работает с одним потоком в одном разделе, но не работает для объединенного оконного потока .. Хотя я мог создать несколько экземпляров потока, по одному для каждого раздела, это как бы побеждает точку объединение и окно .. + мои попытки заставить его работать таким образом не увенчались успехом. Я как бы застрял в поисках вдохновения, куда мне теперь идти ... если у кого-то есть какие-то идеи, которые были бы грандиозными ...

val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate()

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10))
streamingContext.checkpoint(inputOptions.checkpointDir)

//derive the stream and window
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10))

val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L)))
val stateSpec = StateSpec.function(trackStateFunc _)
  .initialState(initialRDD)
  .numPartitions(2)
  .timeout(Seconds(60))

val eventStream = eventHubsWindowedStream
  .map(messageStr => {
    //parse teh event
    var event = gson.fromJson(new String(messageStr), classOf[Event])

    //return a tuble of key/value pair
    (event.product_id.toString, 1)
  })

val eventStateStream = eventStream.mapWithState(stateSpec)

val stateSnapshotStream = eventStateStream.stateSnapshots()
stateSnapshotStream.print()

stateSnapshotStream.foreachRDD { rdd =>
  import sparkSession.implicits._
  rdd.toDF("word", "count").registerTempTable("batch_word_count")
}

streamingContext.remember(Minutes(1))  

streamingContext

person David Crossland    schedule 15.02.2017    source источник
comment
это не работает для объединенного оконного потока. Что не работает?   -  person Yuval Itzchakov    schedule 15.02.2017
comment
Извините, по сути, государственная функция никогда не вызывается. Я не могу отлаживать до этого момента. Когда я использую образец кода, все в порядке ... и когда я использую один поток, все в порядке ... но не тогда, когда я использую объединенный поток или окно ...   -  person David Crossland    schedule 15.02.2017
comment
Вы пробовали выполнять отладку локально в своей среде IDE?   -  person Yuval Itzchakov    schedule 15.02.2017
comment
Действительно, локальная отладка im выполняется успешно с основным количеством примеров / слов.   -  person David Crossland    schedule 15.02.2017
comment
Его странный, ive просмотрел код потоковой передачи концентратора событий, и все выглядит нормально, EventHubsUtils.createUnionStream разрешает этот код val streams = (0 until partitionCount).map { i => createStream(streamingContext, eventhubsParams, i.toString, storageLevel) } streamingContext.union(streams), который возвращает DStream[Array[Byte]], а eventHubsStream.window(Seconds(10)) разрешает DStream[Array[Byte]], я могу сопоставить это с DStream[(String, Int)], который я могу вызвать mapWithState и т. Д. насколько я могу судить, я правильно строю код   -  person David Crossland    schedule 15.02.2017


Ответы (1)


Я решил свою проблему, так как в конечном итоге я использовал прямой поток, и все мои проблемы исчезли. Я избежал этого, поскольку каталог прогресса поддерживает только HDFS или ADL, и теперь я больше не могу тестировать локально.

EventHubsUtils.createDirectStreams (streamingContext, inputOptions.namespace, inputOptions.hdfs, Map (inputOptions.eventhub -> GetEventHubParams (inputOptions)))

Тем не менее, объединенный поток не работает ... Теперь мне просто нужно выяснить, как удалить каталог прогресса в HDFS !!

person David Crossland    schedule 17.02.2017