Spark DataFrame: найдите и установите основной корень для дочернего элемента

У меня есть следующий фрейм данных Apache Spark:

Родитель - Дочерний
A1 - A10
A1 - A2
A2 - A3
A3 - A4
A5 - A7
A7 - A6
A8 - A9

Этот DataFrame отображает связь между родительским и дочерним объектами. Логически это выглядит так:  введите описание изображения здесь

Основная цель - установить главный рут для каждого ребенка. Это означает, что у нас должен быть следующий фрейм данных:

Родитель - Дочерний
A1 - A10
A1 - A2
A1 - A3
A1 - A4
A5 - A7
A5 - A6
A8 - A9


  • Все должно реализовываться с использованием Apache Spark.
  • Количество узлов не ограничено. Это означает, что алгоритмы должны работать независимо от количества узлов

person Yudovin Artsiom    schedule 04.10.2019    source источник
comment
Можете ли вы дать образец ответа, что вы ищете?   -  person vaquar khan    schedule 05.10.2019


Ответы (1)


Я считаю, что с подходом, описанным ниже, вы можете этого добиться.

val input_rdd = spark.sparkContext.parallelize(List(("A1", "A10"), ("A1", "A2"), ("A2", "A3"), ("A3", "A4"), ("A5", "A7"), ("A7", "A6"), ("A8", "A9"), ("A4", "A11"), ("A11", "A12"), ("A6", "A13")))
val input_df = input_rdd.toDF("Parent", "Child")
input_df.createOrReplaceTempView("TABLE1")
input_df.show()

Вход

+------+-----+
|Parent|Child|
+------+-----+
|    A1|  A10|
|    A1|   A2|
|    A2|   A3|
|    A3|   A4|
|    A5|   A7|
|    A7|   A6|
|    A8|   A9|
|    A4|  A11|
|   A11|  A12|
|    A6|  A13|
+------+-----+
# # linkchild function to get the root    
      def linkchild(df: DataFrame): DataFrame = {
    df.createOrReplaceTempView("TEMP")
    val link_child_df = spark.sql("""select distinct a.parent, b.child from TEMP a inner join TEMP b on a.parent = b.parent or a.child = b.parent""")
    link_child_df
    }
# # findroot function to validate and generate output
    def findroot(rdf: DataFrame) {
      val link_child_df = linkchild(rdf)
      link_child_df.createOrReplaceTempView("TEMP1")
      val cnt = spark.sql("""select * from table1 where child not in (select  child from (select * from (select distinct a.parent, b.child from TEMP1 a   inner join TEMP1 b on a.parent = b.parent or a.child = b.parent
    where a.parent not in(select distinct child from                                                                               TABLE1))))""").count()
      if (cnt == 0) {
        spark.sql("""select * from (select distinct a.parent, b.child from   TEMP1 a inner join TEMP1 b on a.parent = b.parent or a.child = b.parent
    where a.parent not in(select distinct child from TABLE1)) order by parent, child""").show
      } else {
        findroot(link_child_df)
      }
    }
# # Calling findroot function for the first time with input_df which in turn calls linkchild function till it reaches target
    findroot(input_df)

Выход

+------+-----+
|parent|child|
+------+-----+
|    A1|  A10|
|    A1|  A11|
|    A1|  A12|
|    A1|  A14|
|    A1|  A15|
|    A1|  A16|
|    A1|  A17|
|    A1|  A18|
|    A1|   A2|
|    A1|   A3|
|    A1|   A4|
|    A5|  A13|
|    A5|   A6|
|    A5|   A7|
|    A8|   A9|
+------+-----+
person sangam.gavini    schedule 05.10.2019
comment
A12 не существует в окончательном результате для этого ввода val input_rdd = spark.sparkContext.parallelize(List(("A1", "A10"), ("A1", "A2"), ("A2", "A3"), ("A3", "A4"), ("A5", "A7"), ("A7", "A6"), ("A8", "A9"), ("A4", "A11"), ("A11", "A12"), ("A6", "A13"))) - person Yudovin Artsiom; 05.10.2019
comment
A12 недоступен, потому что родитель A12 является дочерним элементом A4, а A4 - дочерним элементом A3, а A3 - дочерним элементом A2, а A2 - дочерним элементом A1 .. в соответствии с вашим первым примером это похоже на то, что вы рассматриваете только A1 в этой группе. Это причина того, что у нас НЕ ВХОДИТ в окончательный запрос, который исключил A12 в окончательном результате. Если вы хотите увидеть A12 в конечном результате, удалите NOT IN в запросе, и вы его получите. - person sangam.gavini; 05.10.2019
comment
я ожидаю, что алгоритмы обнаружат главный корень для каждого ребенка. В этом случае алгоритм также должен отображать в окончательном результате A1 - A12. Если я удалю NOT IN в запросе, он будет работать не так, как я ожидал. - person Yudovin Artsiom; 05.10.2019
comment
Только что обновили мой ответ, пожалуйста, подтвердите его своими сценариями. - person sangam.gavini; 05.10.2019
comment
в этом выводе у вас есть A11 - A12. Это неверно, потому что основным родителем для A11 является A1, поэтому A1 также является основным родителем для A12. На выходе должно быть A1 - A12. - person Yudovin Artsiom; 05.10.2019
comment
Я только что понял и поправил, проверьте сейчас. - person sangam.gavini; 05.10.2019
comment
Мне очень жаль, я думаю, у нас возникли недоразумения. Ваши алгоритмы работают только с конечным числом узлов, например от A1 до A12. Я пытаюсь найти решение для общих случаев (независимо от количества узлов). Я обновил описание. На самом деле, у меня много узлов (их может быть больше 10000) и я пытаюсь найти общее решение. - person Yudovin Artsiom; 05.10.2019
comment
Я только что пробовал использовать этот ввод - input_rdd = spark.sparkContext.parallelize (List ((A1, A10), (A1, A2), (A2, A3), (A3, A4), (A5, A7), (A7, A6), (A8, A9), (A4, A11), (A11, A12), (A6, A13), (A12, A14), (A14, A15), (A15, A16), (A16, A17) )) и, похоже, у меня все работает нормально ... это то, что вы ожидаете. - person sangam.gavini; 05.10.2019
comment
количество узлов может быть больше 10000. Алгоритм не обязательно зависит от количества узлов. Это должно быть обычное решение. Например, если я добавлю 100 узлов (... ("A17", "A18"),("A18", "A19"),("A19", "A20")....("A99", "A100") ), это сработает? - person Yudovin Artsiom; 05.10.2019
comment
Я только что обновил свой ответ, теперь я могу сказать, что он работает для любого количества узлов ... проверьте! - person sangam.gavini; 05.10.2019