Псевдонимы функций Spark - эффективные udfs

Контекст

Во многих SQL-запросах, которые я пишу, я обнаруживаю, что комбинирую предопределенные функции Spark точно таким же образом, что часто приводит к подробному и дублированному коду, и мой инстинкт разработчика состоит в том, чтобы захотеть его реорганизовать.

Итак, у меня такой вопрос: есть ли способ определить какой-то псевдоним для комбинаций функций, не прибегая к udfs (которых следует избегать по соображениям производительности) - цель состоит в том, чтобы сделать код более понятным и очиститель. По сути, я хочу что-то вроде udfs, но без потери производительности. Кроме того, эта функция ДОЛЖНА быть вызвана из запроса spark-sql, который можно использовать в spark.sql вызовах.

Пример

Например, предположим, что моя бизнес-логика состоит в том, чтобы перевернуть некоторую строку и хэшировать ее следующим образом: (обратите внимание, что комбинация функций здесь не имеет значения, важно то, что это некоторая комбинация существующих предварительно определенных функций искры - возможно, многие из их)

SELECT 
    sha1(reverse(person.name)),
    sha1(reverse(person.some_information)),
    sha1(reverse(person.some_other_information))
    ...
FROM person

Есть ли способ объявить функцию business без платы за производительность использования udf, позволяя переписать приведенный выше код как:

SELECT 
    business(person.name),
    business(person.some_information),
    business(person.some_other_information)
    ...
FROM person

Я довольно много поискал в документации по искрам и на этом веб-сайте и не нашел способа добиться этого, что довольно странно для меня, потому что это выглядит довольно естественной потребностью, и я не понимаю, почему вы должны обязательно заплатить цену черного ящика за определение и вызов udf.


person Thundzz    schedule 27.07.2019    source источник
comment
Думаю, вы ответили на свой вопрос.   -  person thebluephantom    schedule 28.07.2019


Ответы (1)


Есть ли способ объявить бизнес-функцию, не платя за производительность за счет использования udf

Вам не обязательно использовать udf, вы можете расширить класс Expression или для простейших операций - UnaryExpression. Затем вам нужно будет реализовать всего несколько методов, и поехали. Он изначально интегрирован в Spark, кроме того, что позволяет использовать некоторые преимущества, такие как генерация кода.

В вашем случае добавить функцию business довольно просто:

def business(column: Column): Column = {
  sha1(reverse(column))
}

ДОЛЖЕН быть вызван из запроса spark-sql, который можно использовать в вызовах spark.sql.

Это сложнее, но выполнимо.
Вам необходимо создать регистратор пользовательских функций:

import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Expression 

object FunctionAliasRegistrar {

val funcs: mutable.Map[String, Seq[Column] => Column] = mutable.Map.empty

  def add(name: String, builder: Seq[Column] => Column): this.type = {
    funcs += name -> builder
    this
  }

  def registerAll(spark: SparkSession) = {
    funcs.foreach { case (alias, builder) => {
      def b(children: Seq[Expression]) = builder.apply(children.map(expr => new Column(expr))).expr
      spark.sessionState.functionRegistry.registerFunction(FunctionIdentifier(alias), b)
    }}
  }
}

Тогда вы можете использовать его следующим образом:

FunctionAliasRegistrar
  .add("business1", child => lower(reverse(child.head)))
  .add("business2", child => upper(reverse(child.head)))
  .registerAll(spark) 

dataset.createTempView("data")

spark.sql(
  """
    | SELECT business1(name), business2(name) FROM data
    |""".stripMargin)
.show(false)

Вывод:

+--------------------+--------------------+
|lower(reverse(name))|upper(reverse(name))|
+--------------------+--------------------+
|sined               |SINED               |
|taram               |TARAM               |
|1taram              |1TARAM              |
|2taram              |2TARAM              |
+--------------------+--------------------+

Надеюсь это поможет.

person Gelerion    schedule 28.07.2019
comment
Это действительно именно то, что мне нужно. Я только что протестировал его (искра 2.4.2), и похоже, что если вы хотите использовать конструктор класса Column вместо использования Column.apply в методе registerAll, тогда вам не нужно помещать FunctionRegistrarobject в org.apache.spark.sql! (Возможно, было бы неплохо обновить ответ этим изменением, так как оно делает все это не похожим на взлом, а больше на чистое расширение! Большое спасибо :) - person Thundzz; 28.07.2019
comment
Не заметил, обновлю ответ, спасибо. - person Gelerion; 28.07.2019
comment
Вторая часть хороша. - person thebluephantom; 28.07.2019