Я хочу выполнить типизированное преобразование, чтобы заменить все значения некоторых столбцов в наборе данных. Я знаю, что это возможно с помощью «select», но я хотел бы, чтобы возвращался полный набор данных с измененными значениями конкретных столбцов, а не только отдельные столбцы. Я также знаю, что это возможно и просто с помощью метода withColumn, но это считается нетипизированным преобразованием. Чтобы сделать то же самое для типизированного преобразования и вернуть полный набор данных, я использую mapPartitions, но сталкиваюсь с проблемами:
case class Listing(street: String, zip: Int, price: Int)
val list = List(Listing("Main St", 92323, 30000), Listing("1st St", 94331, 10000),Listing("Sunset Ave", 98283, 50000))
val ds = sc.parallelize(list).toDS
val colNames = ds.columns
val newDS = ds.mapPartitions{ iter => val newDSIter =
for (row <- iter) yield {
val newRow = for (i <- 0 until ds.columns.length) yield {
if (some_condition) {
//using reflection to get field value since the column to be
//processed will be dynamically known based on if condition
val value = row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
//send 'value' to some function for processing and returning new value
} else {
//just return field value
row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
}
} newRow
}
newDSIter
}
Это дает мне следующую ошибку:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Я изменил следующую строку: newRow.as [Листинг]
который показывает ошибку
error: value as is not a member of scala.collection.immutable.IndexedSeq[String]
Это говорит мне, что объект Person не возвращается, а просто набор строк.
Является ли это правильным подходом к возврату полного набора данных после выполнения типизированного преобразования и потери типа в процессе, поскольку я получаю обратно коллекцию String вместо объекта Person?
Другой мой вопрос - это мое замешательство по поводу типизированных и нетипизированных преобразований. Если схема строго определена для DataFrame и над ней выполняется какое-то преобразование, почему это все еще считается нетипизированным преобразованием? Или, если метод withColumn вызывается для набора данных (вместо DataFrame) и возвращаемое значение, преобразованное в набор данных, по-прежнему считается нетипизированным преобразованием?
val newDS = ds.withColumn("zip", some_func).as[Listing]
который возвращает набор данных.
Редактировать:
Обновлена строка возврата строки (newRow) следующим образом:
Listing.getClass.getMethods.find(x => x.getName == "apply" && x.isBridge).get
.invoke(Listing, newRow map (_.asInstanceOf[AnyRef]): _*).asInstanceOf[Listing]
В spark-shell это возвращает Dataset [Listing], как мне нужно, но при компиляции кода с помощью sbt появляется ошибка:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.