Как добавить три столбца, которые являются целыми числами в агрегации Spark sql

Я столкнулся с одной проблемой - это агрегация Spark sql. У меня есть один фрейм данных, из которого я загружаю записи из apache phoenix.

val df = sqlContext.phoenixTableAsDataFrame(
  Metadata.tables(A.Test), Seq("ID", "date", "col1", "col2","col3"),
  predicate = Some("\"date\" = " + date), zkUrl = Some(zkURL))

В другом фрейме данных мне нужно агрегировать на основе идентификатора и даты, а затем суммировать col1, col2, col3, т.е.

val df1 = df.groupBy($"ID", $"date").agg(
  sum($"col1" + $"col2" + $"col3").alias("col4"))

Но при суммировании получаю неверный результат. Как мы можем просуммировать все столбцы (col1, col2, col3) и назначить их col4?

Пример:

Предположим, если данные такие:

ID,date,col1,col2,col3
1,2017-01-01,5,10,12
2,2017-01-01,6,9,17
3,2017-01-01,2,3,7
4,2017-01-01,5,11,13

Ожидаемый результат:

ID,date,col4 
1,2017-01-01,27
2,2017-01-01,32
3,2017-01-01,12
4,2017-01-01,29

person Nikhil Tiwari    schedule 14.02.2018    source источник
comment
Каков ваш вклад и ожидаемый результат?   -  person koiralo    schedule 14.02.2018
comment
я обновил сообщение. пожалуйста, проверьте   -  person Nikhil Tiwari    schedule 14.02.2018
comment
не можете ли вы получить результат с помощью `data.groupBy ($ ID) .agg (sum ($ col1 + $ col2 + $ col3) .alias (col4)). show (false)` что не так с этим   -  person koiralo    schedule 14.02.2018
comment
это сработало? какой неверный результат вы получили?   -  person koiralo    schedule 14.02.2018
comment
спасибо, Шанкар за ответ, я думаю, проблема связана с моим рабочим набором данных. Я постараюсь решить эту проблему и обновить здесь, если я обнаружу, что это имеет отношение к публикации.   -  person Nikhil Tiwari    schedule 14.02.2018
comment
имеют нулевые значения в столбце из-за этого я получаю эту проблему. обработал нулевые значения, и теперь я получаю желаемый результат   -  person Nikhil Tiwari    schedule 15.02.2018


Ответы (1)


Я получаю правильный результат с этим кодом:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, sum}
import org.apache.spark.sql.types.{IntegerType,  StructField, StructType}

  val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
    Seq(
      Row(1, 1, 5, 10, 12 ),
      Row(2, 1, 6, 9,  17 ),
      Row(3, 1, 2, 3,  7),
      Row(4, 1, 5, 11, 13)

    )
  )

  val schema: StructType = new StructType()
    .add(StructField("id",    IntegerType,  false))
    .add(StructField("date",  IntegerType, false))
    .add(StructField("col1",  IntegerType, false))
    .add(StructField("col2",  IntegerType, false))
    .add(StructField("col3",  IntegerType, false))
  val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)

  val df = df0.groupBy(col("id"), col("date")).agg(sum(col("col1") + col("col2") + col("col3")).alias("col4")).sort("id")

  df.show()

Результат:

+---+----+----+
| id|date|col4|
+---+----+----+
|  1|   1|  27|
|  2|   1|  32|
|  3|   1|  12|
|  4|   1|  29|
+---+----+----+

Это то, что тебе надо?

person astro_asz    schedule 14.02.2018
comment
Для этого набора данных (манекен, приведенный выше) оба кода работают и у вас, и у меня. Может быть, проблема связана с моим набором данных для работы. Спасибо за ответ. - person Nikhil Tiwari; 14.02.2018