Десериализация Spark 2.4.0 to_avro/from_avro не работает с Seq().toDF()

Я тестирую в Spark 2.4.0 новые функции from_avro и to_avro.

Я создаю фрейм данных только с одним столбцом и тремя строками, сериализую его с помощью avro и десериализую обратно из avro.

Если входной набор данных создается как

val input1 = Seq("foo", "bar", "baz").toDF("key")

+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+

десериализация просто возвращает N копий последней строки:

+---+
|key|
+---+
|baz|
|baz|
|baz|
+---+

Если я создам входной набор данных как

val input2 = input1.sqlContext.createDataFrame(input1.rdd, input1.schema)

результаты правильные:

+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+

Пример кода:

import org.apache.spark.sql.avro.{SchemaConverters, from_avro, to_avro}
import org.apache.spark.sql.DataFrame

val input1 = Seq("foo", "bar", "baz").toDF("key")
val input2 = input1.sqlContext.createDataFrame(input1.rdd, input1.schema)

def test_avro(df: DataFrame): Unit = {
  println("input df:")
  df.printSchema()
  df.show()

  val keySchema = SchemaConverters.toAvroType(df.schema).toString
  println(s"avro schema: $keySchema")

  val avroDf = df
    .select(to_avro($"key") as "key")

  println("avro serialized:")
  avroDf.printSchema()
  avroDf.show()

  val output = avroDf
    .select(from_avro($"key", keySchema) as "key")
    .select("key.*")

  println("avro deserialized:")
  output.printSchema()
  output.show()
}

println("############### testing .toDF()")
test_avro(input1)
println("############### testing .createDataFrame()")
test_avro(input2)

Результат:

############### testing .toDF()
input df:
root
 |-- key: string (nullable = true)

+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+

avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":["string","null"]}]}
avro serialized:
root
 |-- key: binary (nullable = true)

+----------------+
|             key|
+----------------+
|[00 06 66 6F 6F]|
|[00 06 62 61 72]|
|[00 06 62 61 7A]|
+----------------+

avro deserialized:
root
 |-- key: string (nullable = true)

+---+
|key|
+---+
|baz|
|baz|
|baz|
+---+

############### testing .createDataFrame()
input df:
root
 |-- key: string (nullable = true)

+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+

avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":["string","null"]}]}
avro serialized:
root
 |-- key: binary (nullable = true)

+----------------+
|             key|
+----------------+
|[00 06 66 6F 6F]|
|[00 06 62 61 72]|
|[00 06 62 61 7A]|
+----------------+

avro deserialized:
root
 |-- key: string (nullable = true)

+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+

Судя по тесту, проблема заключается в фазе десериализации, поскольку при печати сериализованного df avro отображаются разные строки.

Я делаю это неправильно или есть ошибка?


person redsk    schedule 05.12.2018    source источник


Ответы (1)


Похоже, это ошибка. Я подал отчет об ошибке, и теперь он исправлен в ветках 2.3 и 2.4.

person Yosuke Mori    schedule 21.06.2019
comment
Спасибо, я должен был сообщить об этом сам. - person redsk; 05.11.2019