Типизированное преобразование наборов данных с помощью mapPartitions

Я хочу выполнить типизированное преобразование, чтобы заменить все значения некоторых столбцов в наборе данных. Я знаю, что это возможно с помощью «select», но я хотел бы, чтобы возвращался полный набор данных с измененными значениями конкретных столбцов, а не только отдельные столбцы. Я также знаю, что это возможно и просто с помощью метода withColumn, но это считается нетипизированным преобразованием. Чтобы сделать то же самое для типизированного преобразования и вернуть полный набор данных, я использую mapPartitions, но сталкиваюсь с проблемами:

case class Listing(street: String, zip: Int, price: Int)
val list = List(Listing("Main St", 92323, 30000), Listing("1st St", 94331, 10000),Listing("Sunset Ave", 98283, 50000))
val ds = sc.parallelize(list).toDS
val colNames = ds.columns

val newDS = ds.mapPartitions{ iter => val newDSIter = 
    for (row <- iter) yield {
      val newRow = for (i <- 0 until ds.columns.length) yield {
        if (some_condition) { 
          //using reflection to get field value since the column to be
          //processed will be dynamically known based on if condition 
          val value = row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
          //send 'value' to some function for processing and returning new value
        } else { 
          //just return field value
          row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
        }
      } newRow
    } 
 newDSIter
}

Это дает мне следующую ошибку:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

Я изменил следующую строку: newRow.as [Листинг]

который показывает ошибку

error: value as is not a member of scala.collection.immutable.IndexedSeq[String]

Это говорит мне, что объект Person не возвращается, а просто набор строк.

Является ли это правильным подходом к возврату полного набора данных после выполнения типизированного преобразования и потери типа в процессе, поскольку я получаю обратно коллекцию String вместо объекта Person?

Другой мой вопрос - это мое замешательство по поводу типизированных и нетипизированных преобразований. Если схема строго определена для DataFrame и над ней выполняется какое-то преобразование, почему это все еще считается нетипизированным преобразованием? Или, если метод withColumn вызывается для набора данных (вместо DataFrame) и возвращаемое значение, преобразованное в набор данных, по-прежнему считается нетипизированным преобразованием?

val newDS = ds.withColumn("zip", some_func).as[Listing]

который возвращает набор данных.

Редактировать:

Обновлена ​​строка возврата строки (newRow) следующим образом:

Listing.getClass.getMethods.find(x => x.getName == "apply" && x.isBridge).get
.invoke(Listing, newRow map (_.asInstanceOf[AnyRef]): _*).asInstanceOf[Listing]

В spark-shell это возвращает Dataset [Listing], как мне нужно, но при компиляции кода с помощью sbt появляется ошибка:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

person ussd84    schedule 13.12.2017    source источник


Ответы (1)


Мне удалось решить эту проблему, сначала преобразовав коллекцию в класс case (см. «Правка») и обеспечив импорт spark.implicits_.

person ussd84    schedule 14.12.2017