tidyr :: spread () с простыми Scala и Spark (строки в столбцы)

мой прототип (написанный на R с пакетами dplyr и tidyr) упирается в стену с точки зрения вычислительной сложности - даже на моем мощная рабочая станция. Поэтому я хочу перенести код на Spark с помощью Scala.

Я просмотрел все преобразования, действия, functions (SparkSQL) и операции с столбцами (также SparkSQL) и обнаружил все эквиваленты функций, кроме одного для функции tidyr::spread(), доступной в R.

df %>% tidyr::spread(key = COL_KEY , value = COL_VAL) в основном распределяет пару "ключ-значение" по нескольким столбцам. Например. стол

COL_KEY | COL_VAL
-----------------
A       | 1
B       | 1
A       | 2

будет преобразован в

A       | B
------------
1       | 0
0       | 1
2       | 1

Если нет готового решения: не могли бы вы указать мне правильное направление? Может быть, функция, определяемая пользователем?

Я могу выбрать версию Spark (и Scala) бесплатно (поэтому я бы выбрал последнюю, 2.0.0).

Спасибо!


person Boern    schedule 12.09.2016    source источник
comment
stackoverflow.com/questions/31927500/   -  person Shankar    schedule 12.09.2016


Ответы (1)


Готово, но требует перемешивания:

df
  // A dummy unique key to perform grouping
  .withColumn("_id", monotonically_increasing_id)
  .groupBy("_id")
  .pivot("COL_KEY")
  .agg(first("COL_VAL"))
  .drop("_id")

// +----+----+
// |   A|   B|
// +----+----+
// |   1|null|
// |null|   1|
// |   2|null|
// +----+----+

При желании вы можете следовать за ним с помощью .na.fill(0).

Вручную без перемешивания:

//  Find distinct keys
val keys = df.select($"COL_KEY").as[String].distinct.collect.sorted

// Create column expressions for each key
val exprs =  keys.map(key => 
  when($"COL_KEY" === key, $"COL_VAL").otherwise(lit(0)).alias(key)
)

df.select(exprs: _*)

// +---+---+
// |  A|  B|
// +---+---+
// |  1|  0|
// |  0|  1|
// |  2|  0|
// +---+---+
person zero323    schedule 12.09.2016
comment
В случае, если кто-то попробует это: не забудьте импортировать свет и при использовании import org.apache.spark.sql.functions.{lit, when} (второй вариант) - person Boern; 07.11.2016