Я очень доволен Spark 2.0 DataSets из-за безопасности типов во время компиляции. Но вот пара проблем, которые я не могу решить, я также не нашел для этого хорошей документации.
Проблема №1 - операция разделения по агрегированному столбцу - Рассмотрим приведенный ниже код - у меня есть DataSet [MyCaseClass], и я хотел сгруппироватьByKey на c1, c2, c3 и sum (c4) / 8. Приведенный ниже код работает хорошо, если я просто вычисляю сумму, но дает ошибку времени компиляции для div (8). Интересно, как я могу добиться следующего.
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
Если я удалю операцию .divide (8) и запустил команду выше, я получу результат ниже.
+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+
Проблема № 2 - преобразование результата groupedByKey в другой типизированный DataFrame - Теперь вторая часть моей проблемы заключается в том, что я хочу снова вывести типизированный DataSet. Для этого у меня есть другой класс case (не уверен, нужен ли он), но я не уверен, как сопоставить с сгруппированным результатом -
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
но это снова не удается, за исключением того, что сгруппированный по ключевым словам результат не отображается напрямую с AnotherClass.
PS: любое другое решение для достижения вышеуказанного более чем приветствуется.