Несколько дней назад я опубликовал еще один вопрос с похожими пожеланиями:
Мне удалось получить хотя бы "рабочее" решение, а это значит, что сам процесс вроде работает правильно. Но, поскольку я чертовски новичок в Spark, я, кажется, упустил некоторые вещи о том, как правильно создавать такие приложения (с точки зрения производительности / вычислений) ...
Что я хочу сделать:
Загружать данные истории из ElasticSearch при запуске приложения
Начните слушать тему Kafka при запуске (с событиями продаж, передаваемыми в виде строк JSON) с помощью Spark Streaming
- Для каждого входящего RDD выполните агрегирование для каждого пользователя.
- Объедините результаты 3. с историей
- Сгруппируйте новые значения, такие как общий доход, на пользователя.
- Используйте результаты из 5. как новую «историю» для следующей итерации.
Мой код следующий:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.elasticsearch.spark.sql._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object ReadFromKafkaAndES {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("kafka").setLevel(Level.WARN)
val checkpointDirectory = "/tmp/Spark"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[4]")
conf.set("es.nodes", "localhost")
conf.set("es.port", "9200")
val topicsSet = Array("sales").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(15))
ssc.checkpoint(checkpointDirectory)
//Create SQLContect
val sqlContext = new SQLContext(sc)
//Get history data from ES
var history = sqlContext.esDF("data/salesaggregation")
//Kafka settings
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Register temporary table for the aggregated history
history.registerTempTable("history")
println("--- History -------------------------------")
history.show()
//Parse JSON as DataFrame
val saleEvents = sqlContext.read.json(rdd.values)
//Register temporary table for sales events
saleEvents.registerTempTable("sales")
val sales = sqlContext.sql("select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId")
println("--- Sales ---------------------------------")
sales.show()
val agg = sqlContext.sql("select a.userId, max(a.latestSaleTimestamp) as latestSaleTimestamp, sum(a.totalRevenue) as totalRevenue, sum(a.totalPoints) as totalPoints from ((select userId, latestSaleTimestamp, totalRevenue, totalPoints from history) union all (select userId, cast(max(saleTimestamp) as Timestamp) as latestSaleTimestamp, sum(totalRevenue) as totalRevenue, sum(totalPoints) as totalPoints from sales group by userId)) a group by userId")
println("--- Aggregation ---------------------------")
agg.show()
//This is our new "history"
history = agg
//Cache results
history.cache()
//Drop temporary table
sqlContext.dropTempTable("history")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
Вычисления, кажется, работают правильно:
--- History -------------------------------
+--------------------+--------------------+-----------+------------+------+
| latestSaleTimestamp| productList|totalPoints|totalRevenue|userId|
+--------------------+--------------------+-----------+------------+------+
|2015-07-22 10:03:...|Buffer(47, 1484, ...| 91| 12.05| 23|
|2015-07-22 12:50:...|Buffer(256, 384, ...| 41| 7.05| 24|
+--------------------+--------------------+-----------+------------+------+
--- Sales ---------------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 255.59| 208|
| 24|2015-07-29 09:17:...|226.08999999999997| 196|
+------+--------------------+------------------+-----------+
--- Aggregation ---------------------------
+------+--------------------+------------------+-----------+
|userId| latestSaleTimestamp| totalRevenue|totalPoints|
+------+--------------------+------------------+-----------+
| 23|2015-07-29 09:17:...| 267.6400001907349| 299|
| 24|2015-07-29 09:17:...|233.14000019073484| 237|
+------+--------------------+------------------+-----------+
но если приложение запускает несколько итераций, я вижу, что производительность ухудшается:
Я также вижу большое количество пропущенных задач, которое увеличивается с каждой итерацией:
Графики первой итерации выглядят как
Графики второй итерации выглядят как
Чем больше итераций прошло, тем длиннее будет график с большим количеством пропущенных шагов.
В основном я думаю, что проблема заключается в сохранении результатов итераций для следующей итерации. К сожалению, после того, как я попробовал много разных вещей и прочитал документацию, я не смог найти решение для этого. Любая помощь приветствуется. Спасибо!