Запуск в тупик при выполнении потоковой агрегации из Kafka

Несколько дней назад я опубликовал еще один вопрос с похожими пожеланиями:

Мне удалось получить хотя бы "рабочее" решение, а это значит, что сам процесс вроде работает правильно. Но, поскольку я чертовски новичок в Spark, я, кажется, упустил некоторые вещи о том, как правильно создавать такие приложения (с точки зрения производительности / вычислений) ...

Что я хочу сделать:

  1. Загружать данные истории из ElasticSearch при запуске приложения

  2. Начните слушать тему Kafka при запуске (с событиями продаж, передаваемыми в виде строк JSON) с помощью Spark Streaming

  3. Для каждого входящего RDD выполните агрегирование для каждого пользователя.
  4. Объедините результаты 3. с историей
  5. Сгруппируйте новые значения, такие как общий доход, на пользователя.
  6. Используйте результаты из 5. как новую «историю» для следующей итерации.

Мой код следующий:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object ReadFromKafkaAndES {
  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("kafka").setLevel(Level.WARN)

    val checkpointDirectory = "/tmp/Spark"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
    conf.set("es.nodes", "localhost")
    conf.set("es.port", "9200")

    val topicsSet = Array("sales").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(15))
    ssc.checkpoint(checkpointDirectory)

    //Create SQLContect
    val sqlContext = new SQLContext(sc)

    //Get history data from ES
    var history = sqlContext.esDF("data/salesaggregation")

    //Kafka settings
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    // Create direct kafka stream with brokers and topics
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Register temporary table for the aggregated history
        history.registerTempTable("history")

        println("--- History -------------------------------")
        history.show()

        //Parse JSON as DataFrame
        val saleEvents = sqlContext.read.json(rdd.values)

        //Register temporary table for sales events
        saleEvents.registerTempTable("sales")

        val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")

        println("--- Sales ---------------------------------")
        sales.show()

        val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")

        println("--- Aggregation ---------------------------")
        agg.show()

        //This is our new "history"
        history = agg

        //Cache results
        history.cache()

        //Drop temporary table
        sqlContext.dropTempTable("history")

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Вычисления, кажется, работают правильно:

--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp|         productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...|         91|       12.05|    23|
|2015-07-22 12:50:...|Buffer(256, 384, ...|         41|        7.05|    24|
+--------------------+--------------------+-----------+------------+------+

--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp|      totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
|    23|2015-07-29 09:17:...|            255.59|        208|
|    24|2015-07-29 09:17:...|226.08999999999997|        196|
+------+--------------------+------------------+-----------+

--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp|      totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
|    23|2015-07-29 09:17:...| 267.6400001907349|        299|
|    24|2015-07-29 09:17:...|233.14000019073484|        237|
+------+--------------------+------------------+-----------+

но если приложение запускает несколько итераций, я вижу, что производительность ухудшается:

Потоковые графики

Я также вижу большое количество пропущенных задач, которое увеличивается с каждой итерацией:

Пропущенные задачи

Графики первой итерации выглядят как

введите здесь описание изображения

Графики второй итерации выглядят как

введите здесь описание изображения

Чем больше итераций прошло, тем длиннее будет график с большим количеством пропущенных шагов.

В основном я думаю, что проблема заключается в сохранении результатов итераций для следующей итерации. К сожалению, после того, как я попробовал много разных вещей и прочитал документацию, я не смог найти решение для этого. Любая помощь приветствуется. Спасибо!


person Tobi    schedule 29.07.2015    source источник
comment
Я бы хотел, чтобы все вопросы были так хорошо задокументированы.   -  person maasg    schedule 29.07.2015


Ответы (1)


Это задание потоковой передачи не находится в «тупике», но время его выполнения экспоненциально увеличивается с каждой итерацией, в результате чего задание потоковой передачи завершается неудачей раньше, чем позже.

Итерационный процесс union-> reduce-> union-> reduce ... в RDD создает постоянно растущую линию RDD. Каждая итерация добавляет к этим линиям зависимости, которые необходимо вычислить на следующей итерации, что также приводит к увеличению времени выполнения. График зависимости (происхождения) ясно показывает это.

Одно из решений - регулярно проверять RDD.

history.checkpoint()

Вы также можете изучить возможность замены процесса объединения / сокращения на updateStateByKey < / а>

person maasg    schedule 29.07.2015
comment
Большое спасибо за ответ. Я предполагал, что вызов cache() или persist() вызовет проблемы с происхождением, но, по всей видимости, я неправильно понял этот момент. Поскольку history - это DataFrame, а не RDD, я думаю, что не могу использовать _6 _... - person Tobi; 29.07.2015
comment
Извините, я просмотрел документацию, там есть history.rdd.checkpoint(). Использование этого заставляет весь процесс останавливаться еще быстрее ... Похоже, что использование cache() было бы лучшим вариантом. Я действительно дергаю за это за волосы. Возможно, мне придется выбрать updateStateByKey(), но тогда я потеряю все удобство обработки полезных данных JSON через _4 _... - person Tobi; 29.07.2015
comment
Оказывается, если я удалю команды show(), каждая итерация будет выполняться менее чем за 100 мс ... Честно говоря, я действительно не понимаю, почему это так. Это потому, что данные извлекаются из разделов? - person Tobi; 29.07.2015
comment
@Tobi, если вы удалите show(), какое действие будет выполнено на базовом RDD? Я предполагаю, что никакое действие не будет выполнено, поэтому ничего не делает. - person maasg; 29.07.2015
comment
Похоже, да. Если я добавлю history.rdd.checkpoint(), это сразу же остановит процессы, так что я сразу же увижу некоторую задержку планирования, которая увеличивает каждую итерацию. Почему-то я не понимаю, как это (должно) работать. Объем данных, с которыми я тестирую, смехотворно мал (~ 30 событий / 15 секунд). - person Tobi; 29.07.2015
comment
Все еще не уверен, почему контрольные точки не решают проблемы. Может у вас есть другая идея? Попробую завтра метод updateStateByKey. К сожалению, я, вероятно, потеряю удобную обработку JSON по пути ... - person Tobi; 29.07.2015