Невозможно работать с нестандартным типом после его кодирования? Набор данных Spark

Скажем, у вас есть это (решение для кодирования нестандартного типа взято из этого потока):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

Когда делаю ds.show, я получил:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

Я понимаю, что это потому, что содержимое закодировано во внутреннем двоичном представлении Spark SQL. Но как я могу отобразить декодированный контент таким образом?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

ОБНОВЛЕНИЕ1

Отображение контента - не самая большая проблема, более важно то, что это может привести к проблемам при обработке набора данных, рассмотрим этот пример:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

Означает ли это, что kryo-кодированный тип не может удобно выполнять операции, подобные joinWith?

Как мы обрабатываем нестандартный тип на Dataset тогда?
Если мы не можем обработать его после кодирования, в чем смысл этого решения kryo кодирования для нестандартного типа ?!

(Решение, предоставленное @jacek ниже, полезно для типа case class, но оно по-прежнему не может декодировать пользовательский тип)


person jack    schedule 03.10.2020    source источник


Ответы (1)


Следующее сработало для меня, но похоже на использование высокоуровневого API для низкоуровневой (десериализации) работы.

Это не значит, что так нужно делать, но показывает, что это возможно.

Я не знаю, почему KryoDeserializer не выполняет десериализацию байтов в объект, из которого были получены байты. Это просто так.

Одно из основных различий между вашим определением класса и моим - это case, которое позволило мне использовать следующий трюк. Опять же, точно не знаю, почему это стало возможным.

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+
person Jacek Laskowski    schedule 03.10.2020
comment
Спасибо за ответ. Означает ли это, что каждый раз при отображении набора данных (пользовательского типа, например, определяемого case class) или записи на диск, мы должны сначала выполнить ks.newInstance.deserialize? Это сильно сбивает меня с толку относительно использования кодировщиков в Dataset. - person jack; 03.10.2020
comment
Не уверен, что я правильно вас понял, но с тех пор, как вы использовали kryo, все немного изменилось. Придерживайтесь case class и import spark.implicits._, и все будет в порядке. Другими словами, почему вы подумали о Encoders.kryo? Я редко вижу его применение. - person Jacek Laskowski; 03.10.2020
comment
Для типа, не входящего в список предопределенных кодировщиков, it кажется, мы должны использовать kryo, не так ли? Если мы его используем, ds находится в двоичном формате, а не в табличном, что предотвращает дальнейшее преобразование и обработку данных (представьте, что вы хотите сделать простой joinWith в столбце i с другим ds, сначала вам нужно сделать ks.newInstance.deserialize, что действительно неудобно ). - person jack; 04.10.2020
comment
Для неподдерживаемых типов вам необходимо преобразовать их в те, которые поддерживаются. Нет нужды в крио. - person Jacek Laskowski; 04.10.2020
comment
Фактически, когда я вставил ваш код в искровую оболочку, я получил эту ошибку (Spark 3.0.1) на этапе .asInstanceOf[MyObj]: java.lang.UnsupportedOperationException: Schema for type MyObj is not supported at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69) - person jack; 09.10.2020
comment
@jack Вы могли пропустить это // Note that case keyword в моем коде, не так ли? - person Jacek Laskowski; 09.10.2020
comment
Это правда! Но тогда это не то решение, которое я ищу, потому что я бы не стал кодировать объект case class с помощью kryo, поскольку import spark.implicits._ я уже получил преимущества кодировщиков Spark по умолчанию. Я ищу решение для декодирования / десериализации пользовательского типа, закодированного kryo - person jack; 09.10.2020
comment
Я не могу представить, как я могу работать с закодированными пользовательскими объектами, если я не десериализую их. Или скажите, какой смысл использовать kryo, если я не могу декодировать обратно? - person jack; 09.10.2020