Spark не может получить события из Amazon Kinesis

Недавно я пытался получить события чтения Spark из Kinesis, но у меня проблемы с получением событий. Хотя Spark может подключаться к Kinesis и получать метаданные от Kinesis, он не может получать от него события. Он всегда возвращает нулевые элементы.

Ошибок нет, просто возвращаются пустые результаты. Spark может получать метаданные (например, количество шардов в кинезисе и т. Д.).

Я использовал эти [1 и 2] руководства, чтобы заставить его работать, но пока мне не повезло. Я также попробовал несколько предложений от SO [3]. В кластере достаточно ресурсов / ядер.

Мы видели конфликт версий в Protobuf Version между Spark и Kinesis, который также может быть причиной такого поведения. Spark использует protobuf-java версии 2.5.0, а kinesis, вероятно, использует protobuf-java-2.6.1.jar.

Просто интересно, сталкивался ли кто-нибудь с таким поведением или есть искра, работающая с кинезисом.

Пробовали с Spark 1.5.0, Spark 1.6.0.

  1. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
  2. https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

  3. Образец Apache Spark Kinesis не работает


person Yash Sharma    schedule 23.02.2016    source источник


Ответы (1)


Отвечая на свой вопрос -

Я добился некоторого успеха с интеграцией Spark Kinesis, и ключевым моментом является unionStreams.foreachRDD.

Доступны 2 версии foreachRDD.

  • unionStreams.foreachRDD
  • unionStreams.foreachRDD ((rdd: RDD [Array [Byte]], время: Время)

По какой-то причине первый не может дать мне результатов, но переход на второй дает мне результаты, как и ожидалось. Еще предстоит выяснить причину.

Добавление фрагмента кода ниже для справки.

Также подумайте об изменении этого. Мне это тоже помогло -

"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1",  // Works

Надеюсь, это кому-то поможет :)

Спасибо всем за помощь.

val kinesisStreams = (0 until numStreams).map {
  count =>
    val stream = KinesisUtils.createStream(
      ssc,
      consumerName,
      streamName,
      endpointUrl,
      regionName,
      InitialPositionInStream.TRIM_HORIZON,
      kinesisCheckpointInterval,
      StorageLevel.MEMORY_AND_DISK_2
    )

    stream
}
val unionStreams = ssc.union(kinesisStreams)

println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")

/*unionStreams.foreachRDD{ // Doesn't Work !!
  rdd =>
    println(rdd.count)
    println("rdd isempty:" + rdd.isEmpty)
}*/ 
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
  println(rdd.count)
  println("rdd isempty:" + rdd.isEmpty)
  }
)

ssc.start()
ssc.awaitTermination()
person Yash Sharma    schedule 23.02.2016