Spark aggregateByKey в наборе данных

Вот пример aggregateByKey на mutable.HashSet [String], написанный @ bbejeck

val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

Но когда я перешел на набор данных, я получил следующую ошибку, потому что Spark 2.0 (версия, которую я использую) не поддерживает aggregateByKey в наборе данных?

java.lang.NullPointerException
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)

Вот код:

case class Food(name: String,
                price: String,
                e_date: String)    
rdd.aggregateByKey( Seq(Food("", "", "")).toDS )( 
                    (f1,f2) => f1.union(f2), 
                    (f1,f2) => f1.union(f2))
/////////
found f1 = Invalid tree; null:
                    null

Любые идеи, почему это происходит, заранее спасибо!


person faustineinsun    schedule 25.08.2016    source источник


Ответы (1)


Да, я думаю, aggregateByKey работает только с rdd.
вот документация (для python)
http://spark.apache.org/docs/latest/api/python/pyspark.html

Удалите .toDS и попробуйте код. Возможно, преобразовать его в DS после завершения агрегации (не уверен, будет ли это лучше по производительности).

person Bigby    schedule 25.08.2016