Выбор набора данных Spark с типизированным столбцом

Глядя на функцию select() в Spark DataSet, можно увидеть различные сгенерированные сигнатуры функций:

(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)

Кажется, это намекает на то, что я должен иметь возможность напрямую ссылаться на членов MyClass и быть безопасным по типу, но я не уверен, как ...

ds.select("member") конечно работает .. кажется, ds.select(_.member) тоже может как-то работать?


person Jeremy    schedule 28.07.2016    source источник


Ответы (2)


В Scala DSL для select есть много способов идентифицировать Column:

  • Из символа: 'name
  • Из строки: $"name" или col(name)
  • Из выражения: expr("nvl(name, 'unknown') as renamed")

Чтобы получить TypedColumn от Column, просто используйте myCol.as[T].

Например: ds.select(col("name").as[String])

person Sim    schedule 28.07.2016
comment
этот ответ правильный, однако имейте в виду, что this as [T] небезопасен по типу, поэтому он может взорваться в RT, если принять неправильный тип. - person linehrr; 11.12.2018
comment
Хорошая точка зрения. Чтобы получить максимальную помощь от компилятора, вам нужно полностью переключиться на типы Scala, например, ds.as[T].map { t: T => ... }. Обратите внимание, что будет стоимость преобразования данных, поскольку Spark внутренне использует необработанные двоичные данные, а не типы Scala. - person Sim; 12.12.2018

Если вам нужен эквивалент ds.select(_.member), просто используйте map:

case class MyClass(member: MyMember, foo: A, bar: B)
val ds: DataSet[MyClass] = ???
val members: DataSet[MyMember] = ds.map(_.member)

Изменить: аргумент в пользу отказа от использования map.

Более эффективный способ сделать то же самое - через проекцию, а не использовать map вообще. Вы теряете проверку типов во время компиляции, но взамен даете механизму запросов Catalyst возможность сделать что-то более оптимизированное. Как намекает @Sim в своем комментарии ниже, основная оптимизация сосредоточена вокруг того, чтобы не требовать десериализации всего содержимого MyClass из пространства памяти Tungsten в память кучи JVM - просто для вызова средства доступа - а затем сериализовать результат _.member обратно в Вольфрам.

Чтобы сделать более конкретный пример, давайте переопределим нашу модель данных следующим образом:

  // Make sure these are not nested classes 
  // (i.e. in a top level compilation units).
  case class MyMember(something: Double)
  case class MyClass(member: MyMember, foo: Int, bar: String)

Это должны быть case классы, чтобы SQLImplicits.newProductEncoder[T <: Product] мог предоставить нам неявный Encoder[MyClass], требуемый Dataset[T] API.

Теперь мы можем сделать приведенный выше пример более конкретным:

  val ds: Dataset[MyClass] = Seq(MyClass(MyMember(1.0), 2, "three")).toDS()
  val membersMapped: Dataset[Double] = ds.map(_.member.something)

Чтобы увидеть, что происходит за кулисами, мы используем метод explain():

membersMapped.explain()

== Physical Plan ==
*(1) SerializeFromObject [input[0, double, false] AS value#19]
+- *(1) MapElements <function1>, obj#18: double
   +- *(1) DeserializeToObject newInstance(class MyClass), obj#17: MyClass
      +- LocalTableScan [member#12, foo#13, bar#14]

Это делает сериализацию в / из Tungsten явно очевидной.

Получим то же значение с помощью проекции [^ 1]:

val ds2: Dataset[Double] = ds.select($"member.something".as[Double])
ds2.explain()

== Physical Plan ==
LocalTableScan [something#25]

Вот и все! Один шаг [^ 2]. Никакой сериализации, кроме кодирования MyClass в исходный набор данных.

[^ 1]: Причина, по которой проекция определяется как $"member.something", а не $"value.member.something", связана с тем, что Catalyst автоматически проецирует элементы одного столбца DataFrame.

[^ 2]: Честно говоря, * рядом с шагами в первом физическом плане означает, что они будут реализованы WholeStageCodegenExec, в результате чего эти шаги станут единой, скомпилированной на лету функцией JVM, имеющей собственный набор времени выполнения. к нему применены оптимизации. Поэтому на практике вам придется эмпирически проверить производительность, чтобы действительно оценить преимущества каждого подхода.

person metasim    schedule 11.05.2017
comment
Обратите внимание, что будет стоимость преобразования данных, поскольку Spark внутренне использует необработанные двоичные данные, а не типы Scala. - person Sim; 12.12.2018
comment
В чем преимущество использования набора данных в этом случае? Это просто компромисс между безопасностью типов перед производительностью? Я совершенно не понимаю, когда Dataset будет полезен! - person Aravind Yarram; 14.04.2020
comment
В большинстве случаев вам просто нужно использовать Dataframe. Иногда для взаимодействия с другими функциями вам может потребоваться войти в пространство DataSet, чтобы иметь возможность вызывать map, flatMap и т. Д. Без создания UDF. Или какой-нибудь другой побочный случай. - person metasim; 14.04.2020