Чтение / запись паркета с типом столбца Struct

Я пытаюсь написать такой Dataframe в Parquet:

| foo | bar               |
|-----|-------------------|
|  1  | {"a": 1, "b": 10} |
|  2  | {"a": 2, "b": 20} |
|  3  | {"a": 3, "b": 30} |

Я делаю это с помощью Pandas и Fastparquet:

df = pd.DataFrame({
    "foo": [1, 2, 3],
    "bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})

import fastparquet
fastparquet.write('/my/parquet/location/toy-fastparquet.parq', df)

Я хотел бы загрузить Parquet в (py) Spark и запросить данные с помощью Spark SQL, например:

df = spark.read.parquet("/my/parquet/location/")
df.registerTempTable('my_toy_table')
result = spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 15")

Моя проблема в том, что, хотя fastparquet может правильно читать свой файл Parquet (поле bar правильно десериализовано как Struct), в Spark, bar читается как столбец типа String, который просто содержит JSON-представление исходной структуры:

In [2]: df.head()                                                                                                                                                                                           
Out[2]: Row(foo=1, bar='{"a": 1, "b": 10}')

Я пробовал писать Parquet из PyArrow, но безуспешно: ArrowNotImplementedError: Level generation for Struct not supported yet. Я также пробовал передать file_scheme='hive' в Fastparquet, но получил те же результаты. Изменение сериализации Fastparquet на BSON (object_encoding='bson') привело к нечитаемому двоичному полю.

[EDIT] я вижу следующие подходы:

  • [ответил] Написать Parquet из Spark
  • [open] Найдите библиотеку Python, которая реализует Спецификация Parquet для вложенных типов, и это совместимо с тем, как Spark их читает.
  • [ответил] Прочтите файлы Fastparquet в Spark с определенной десериализацией JSON (я полагаю, это влияет на производительность)
  • Не используйте полностью вложенные структуры

person Dario Chi    schedule 14.02.2020    source источник
comment
На данный момент это действительно ограничение Arrow, см. issues.apache.org/jira/browse / ARROW-1644   -  person joris    schedule 14.02.2020
comment
Спасибо @joris, мой DF не содержит смесь списка и структуры, только поле структуры (я сделал описание более понятным). Однако похоже, что и этот случай на данный момент не поддерживается.   -  person Dario Chi    schedule 14.02.2020
comment
Вы пробовали передавать schema при загрузке данных?   -  person Cesar A. Mostacero    schedule 14.02.2020
comment
@ cesar-a-mostacero Я пробовал, но это не сработало, потому что мне не хватало декодирования JSON, которое Александрос объяснил в ответе ниже   -  person Dario Chi    schedule 18.02.2020


Ответы (1)


Здесь у вас есть как минимум 3 варианта:

Вариант 1:

Вам не нужно использовать какие-либо дополнительные библиотеки, такие как fastparquet, поскольку Spark уже предоставляет эту функцию:

pdf = pd.DataFrame({
    "foo": [1, 2, 3],
    "bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})

df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")

Если попытаться загрузить данные с помощью df = spark.read.parquet("/tmp/parquet1"), схема будет такой:

StructType([ 
            StructField("foo", LongType(), True),
            StructField("bar",MapType(StringType(), LongType(), True), True)])

Как видите, в этом случае Spark сохранит правильную схему.

Вариант 2:

Если по какой-либо причине по-прежнему необходимо использовать fastparquet, то bar будет рассматриваться как строка, поэтому вы можете загрузить bar как строку, а затем преобразовать ее в JSON, используя from_json. В вашем случае мы будем обрабатывать json как словарь Map (string, int). Это сделано для нашего удобства, поскольку данные кажутся последовательностью ключ / значение, которая может быть прекрасно представлена ​​словарем:

from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json

df = spark.read.parquet("/tmp/parquet1")

# schema should be a Map(string, string) 
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()

# +---+-----------------+
# |foo|              bar|
# +---+-----------------+
# |  1|[a -> 1, b -> 10]|
# |  2|[a -> 2, b -> 20]|
# |  3|[a -> 3, b -> 30]|
# +---+-----------------+

Вариант 3:

Если ваша схема не меняется и вы знаете, что каждое значение bar всегда будет иметь одинаковую комбинацию полей (a, b), вы также можете преобразовать bar в структуру:

schema = StructType([ 
                    StructField("a", LongType(), True),
                    StructField("b", LongType(), True)
            ])

df = df.withColumn("bar", from_json("bar", schema))

df.printSchema()

# root
#  |-- foo: long (nullable = true)
#  |-- bar: struct (nullable = true)
#  |    |-- a: long (nullable = true)
#  |    |-- b: long (nullable = true)

Пример:

Затем вы можете запустить свой код с помощью:

df.registerTempTable('my_toy_table')

spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")

# +---+-----------------+
# |foo|              bar|
# +---+-----------------+
# |  3|[a -> 3, b -> 30]|
# +---+-----------------+
person abiratsis    schedule 14.02.2020
comment
id, вероятно, пойдет с вариантом 2, потому что он соответствует тому, что OP пытается выполнить с помощью функции pyspark from_json - person murtihash; 17.02.2020
comment
Спасибо @ alexandros-biratsis за отличный ответ! Вариант 1 был бы идеальным, но, к сожалению, производитель Parquet не использует Spark. Таким образом, похоже, что Fastparquet не реализует собственную спецификацию Map для Parquet (github.com/apache/parquet-format/blob/master/); вместо этого они сериализуются в JSON, и мне нужно десериализовать в Spark ... Интересно, существует ли другая библиотека, которая это делает. Если нет, я согласен, что вариант 2 - лучший вариант. - person Dario Chi; 17.02.2020
comment
Привет, Дарио, к сожалению, я действительно не знаю, существует ли такая библиотека, поддерживающая записи структур. - person abiratsis; 17.02.2020