Как создать коллекцию Encoder for Scala (для реализации пользовательского агрегатора)?

Искра 2.3.0 со Скала 2.11. Я реализую пользовательский Aggregator в соответствии с документами здесь. Агрегатор требует 3 типа для ввода, буфера и вывода.

Мой агрегатор должен воздействовать на все предыдущие строки в окне, поэтому я объявил его так:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

Предполагается, что один из методов переопределения возвращает кодировщик для типа буфера, в данном случае это ListBuffer. Я не могу найти подходящий кодировщик для org.apache.spark.sql.Encoders или какой-либо другой способ закодировать это, поэтому я не знаю, что здесь вернуть.

Я подумал о создании нового класса case, который имеет одно свойство типа ListBuffer[Foo], и использовать его в качестве моего класса буфера, а затем использовать для него Encoders.product, но я не уверен, что это необходимо или есть что-то еще, что мне не хватает. Спасибо за любые советы.


person Uncle Long Hair    schedule 06.04.2018    source источник
comment
Просто анализирую то, что вы сказали, чтобы лучше понять ситуацию. Мой агрегатор должен воздействовать на все предыдущие строки в окне, поэтому я объявил его следующим образом: имеет ли какое-то значение, воздействует ли ваш агрегатор на предыдущие строки или любые строки? Просто интересно, насколько это важно.   -  person Jacek Laskowski    schedule 04.05.2018
comment
В моем случае использования скажем, что есть 5 строк, результат для строки 1 зависит от строки 1, результат для строки 2 зависит от строк 1 и 2, результат для строки 3 зависит от 1-3 и т. д. Это зависит от конкретной сортировки строк. Если бы каждая строка зависела от всех других строк, я думаю, мне пришлось бы сделать это за 2 прохода: сначала собрать все значения для окна с помощью collect_list или collect_set, а другой — для вычисления агрегированных значений.   -  person Uncle Long Hair    schedule 08.05.2018
comment
Разве это объединение окон и требование предыдущих строк не является спецификацией окна?   -  person Jacek Laskowski    schedule 08.05.2018


Ответы (2)


Вы должны просто позволить Spark SQL сделать свою работу и найти правильный кодировщик, используя ExpressionEncoder следующим образом:

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
person Jacek Laskowski    schedule 04.05.2018
comment
Спасибо, это удивительно просто и, кажется, работает. Кодировщик, созданный неявно, не тот, который я мог бы создать явно. - person Uncle Long Hair; 10.05.2018

Я не вижу в org.apache.spark.sql.Encoders ничего, что можно было бы использовать для прямого кодирования ListBuffer или, если уж на то пошло, даже List

Один из вариантов, похоже, заключается в том, чтобы поместить его в класс case, как вы предложили:

import org.apache.spark.sql.Encoders

case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]

Другим вариантом может быть использование крио:

Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]

Или, наконец, вы можете взглянуть на ExpressionEncoders, которые расширяют Encoder:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]

Это лучшее решение, так как оно делает все прозрачным для катализатора и, следовательно, позволяет ему выполнять все замечательные оптимизации.

Во время игры я заметил одну вещь:

ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema

Я не тестировал ничего из вышеперечисленного при выполнении агрегатов, поэтому могут возникнуть проблемы во время выполнения. Надеюсь, это полезно.

person user2682459    schedule 08.04.2018