Как анализировать данные JSON с помощью Spark-Scala

Мне необходимо проанализировать данные JSON, как показано в ожидаемых результатах ниже, в настоящее время я не понимаю, как включить имя сигналов (ABS, ADA, ADW) в столбец сигналов. Любая помощь приветствуется.

Я пробовал что-то, что дает результаты, как показано ниже, но мне нужно будет также включить все сигналы в столбец SIGNAL, который показан в ожидаемых результатах.

jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()

+-------------+--------- --+
|        stime|can_value   |
+-------------+---------   +
|value of E   |value of V  |
+-------------+----------- +

df.printSchema

 -- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)
 |-- APP: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)

I will need output like below:

-----------------+-------------+---------+
|SIGNAL        |stime            |can_value|
+-----------------+-------------+---------+
|ABS           |value of E   |value of V  |
|ADA           |value of E   |value of V  |
|ADW           |value of E   |value of V  |
+-----------------+-------------+---------+

person Anil Kumar    schedule 25.07.2019    source источник


Ответы (1)


Чтобы получить ожидаемый результат и вставить значения в столбец «Сигнал»:

jsonDF.select(explode($"ABS") as "element")
    .withColumn("stime", col("element.E"))
    .withColumn("can_value", col("element.V"))
    .drop(col("element"))
    .withColumn("SIGNAL",lit("ABS"))
    .show()

И обобщенная версия вышеупомянутого подхода:

(На основе результата df.printSchema при условии, что у вас есть значения сигналов в качестве имен столбцов, и эти столбцы содержат массив, содержащий элементы структуры формы (E, V))

val columns:Array[String] = df.columns

var arrayOfDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = df.selectExpr("explode("+col_name+") as element")
    .select(
      lit(col_name).as("SIGNAL"),
      col("element.E").as("stime"),
      col("element.V").as("can_value"))

  arrayOfDFs = arrayOfDFs :+ temp
}

val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
person Arati Nagmal    schedule 25.07.2019
comment
Привет @Arati, Спасибо за вашу помощь, я добираюсь туда, но в настоящее время застрял с одной проблемой, согласно df.printSchema, последний jsonDF многократно печатает только последнее имя сигнала (APP) и последнее значение E, V для сигнал (приложение). Есть ли способ убедиться, что jsonDF хранит все имена сигналов и соответствующие значения, прежде чем он выйдет из цикла for. Любая помощь приветствуется. - person Anil Kumar; 28.07.2019
comment
@Anil, у меня возникла проблема, и я обновил ответ, попробуйте. Из-за '.withColumn (SIGNAL, when (col (SIGNAL) .isNotNull, col (SIGNAL)). Else (lit (col_name)))' в jsonDF в предыдущем ответе, он копировал последнее имя сигнала для всех строк. - person Arati Nagmal; 29.07.2019
comment
Спасибо за ваше время, я принял ваш ответ. Нужна помощь по следующему запросу: мой df.printSchema выглядит как ниже root | - APP: array (nullable = true) | | - element: struct (containsNull = true) | | | - E: long (nullable = true) | | | - V: double (nullable = true) | - B1X: массив (nullable = true) | | - element: struct (containsNull = true) | | | - E: long (nullable = true) | | | - V: long (nullable = true) | - VIN: строка (nullable = true) в моем последнем jsonDF мне также нужно будет включить VIN (SIGNAL, STIME, CAN_VALUE, VIN) - person Anil Kumar; 29.07.2019
comment
Чтобы добавить больше столбцов в окончательный jsonDF, вам нужно будет выбрать их при создании временного фрейма данных в цикле for, как показано ниже: 'df.selectExpr (explode (+ col_name +) as element, VIN) .select (.,., Col ( VIN)) ' - person Arati Nagmal; 29.07.2019
comment
Привет @Arati, поскольку VIN не является типом карты / массива, я не могу использовать его в функции разнесения. В коде для (col_name ‹- columns) {....} мне нужно будет удалить VIN, так как Explode выдаст следующую ошибку: org.apache.spark.sql.AnalysisException: не удается разрешить 'explode (VIN)' из-за несоответствие типов данных: входные данные функции разнесения должны быть массивом или типом карты, а не строкой; Но мне понадобится этот VIN в моем последнем jsonDF как (SIGNAL, stime, can_value, VIN). Любая помощь приветствуется. - person Anil Kumar; 29.07.2019
comment
Вы должны использовать VIN вне функции explode () в функции .selectExpr (). - person Arati Nagmal; 29.07.2019
comment
Здравствуйте @Arati, Большое спасибо, большое спасибо за ваше время и усилия :) Все работало нормально. - person Anil Kumar; 29.07.2019
comment
Нет проблем @Anil! - person Arati Nagmal; 30.07.2019