Как Spark преобразует преобразование / действие RDD в логический план?

Я изучаю механизм Spark CodeGen, но меня смущает то, как Spark преобразует преобразование/действие RDD в логический план. Приложение Spark выглядит следующим образом:

 def sparkTest(): Any = {

    val spark = SparkInit.spark

    import spark.implicits._

    val data = Seq(1, 2, 3, 4, 5, 6, 7, 8)

    // closure start

    val const = 3

    def mulBy(factor: Double) = (x: Double) => factor * x

    val mulByval = mulBy(const)

    // closure end

    val testRDD = data.toDS()
    val filterRDD = testRDD.filter(i =>
      mulByval(i) <= 7
    )

    filterRDD.collect()

    filterRDD.foreach(i =>
      println(i)
    )


  }

Я попытался отследить исходный код, но обнаружил, что когда код переходит в Dataset.collect, queryExecution уже сгенерирован.

 def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

Выполнение запроса выглядит следующим образом

== Parsed Logical Plan ==
'TypedFilter <function1>, int, [StructField(value,IntegerType,false)], unresolveddeserializer(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))
+- LocalRelation [value#46]

== Analyzed Logical Plan ==
value: int
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], cast(value#46 as int)
+- LocalRelation [value#46]

== Optimized Logical Plan ==
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], value#46: int
+- LocalRelation [value#46]

== Physical Plan ==
*Filter <function1>.apply$mcZI$sp
+- LocalTableScan [value#46]

Но я не могу найти, когда и где генерируется Логический план. Я что-то пропустил?


person lulijun    schedule 26.10.2017    source источник


Ответы (1)


Здесь есть небольшая путаница. RDD API на самом деле не создает планы. Они существуют как примитивные или старые API, которые наборы данных используют для работы. В вашем конкретном примере план запроса начинает строиться (хотя и лениво), когда вы пишете эту строку.

val testRDD = data.toDS()

После этого у вас больше нет RDD, у вас есть DataSet, который закодирован из результата происхождения «данных». Вы можете увидеть план любого набора данных, вызвав метод объяснения, чтобы получить более подробную информацию.

Итак, подведем итог

  • Каждый набор данных имеет ссылку queryExecution при инициализации по дизайну.
  • У RDD нет логического плана (или какого-либо кода)
  • RDDS, превращенные в наборы данных, имеют план, первый шаг которого проходит через дерево зависимостей RDD.

Глядя на ваш код более подробно, на самом деле RDD никогда не используется. Вы начинаете с коллекции и переходите непосредственно к набору данных, который создает LocalTableScan, который в основном просто скрывает значения в InternalRowRepresentation и распараллеливает их. См. LocalTableScanExec для получения подробной информации.

person RussS    schedule 26.10.2017
comment
Спасибо за ваш ответ. Но у меня есть еще один вопрос. Это то, что я понимаю сейчас: когда набор данных инициализируется, он будет генерировать план по всем преобразованиям и действиям этого набора данных. А Spark будет генерировать исполняемый код на основе плана выполнения. То, что искра будет сериализоваться как замыкание и отправлять своим исполнителям, — это сгенерированный код spark.sql.codegen.wholeStage, а не фактическое преобразование или действие набора данных. Я не уверен, правильно ли я понимаю. - person lulijun; 26.10.2017
comment
Логический план преобразуется в Физический план. Физический план — это набор узлов, которые могут содержать или не содержать информацию о генераторе кодов. Те узлы, которые встречаются на одном и том же этапе, имеют свои методы codegen, объединенные в WholeStageCodgen. Преобразования набора данных возвращают новый набор данных, как правило, с одним дополнительным логическим узлом (или, возможно, со многими). Действия обычно добавляют завершающий логический узел, а затем сразу запускают выполнение соответствующего физического плана. - person RussS; 27.10.2017