Я изучаю механизм Spark CodeGen, но меня смущает то, как Spark преобразует преобразование/действие RDD в логический план. Приложение Spark выглядит следующим образом:
def sparkTest(): Any = {
val spark = SparkInit.spark
import spark.implicits._
val data = Seq(1, 2, 3, 4, 5, 6, 7, 8)
// closure start
val const = 3
def mulBy(factor: Double) = (x: Double) => factor * x
val mulByval = mulBy(const)
// closure end
val testRDD = data.toDS()
val filterRDD = testRDD.filter(i =>
mulByval(i) <= 7
)
filterRDD.collect()
filterRDD.foreach(i =>
println(i)
)
}
Я попытался отследить исходный код, но обнаружил, что когда код переходит в Dataset.collect, queryExecution уже сгенерирован.
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)
Выполнение запроса выглядит следующим образом
== Parsed Logical Plan ==
'TypedFilter <function1>, int, [StructField(value,IntegerType,false)], unresolveddeserializer(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))
+- LocalRelation [value#46]
== Analyzed Logical Plan ==
value: int
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], cast(value#46 as int)
+- LocalRelation [value#46]
== Optimized Logical Plan ==
TypedFilter <function1>, int, [StructField(value,IntegerType,false)], value#46: int
+- LocalRelation [value#46]
== Physical Plan ==
*Filter <function1>.apply$mcZI$sp
+- LocalTableScan [value#46]
Но я не могу найти, когда и где генерируется Логический план. Я что-то пропустил?