Сохранение разделов фрейма данных dask при загрузке нескольких файлов паркета

У меня есть данные временного ряда в кадрах данных со временем в качестве индекса. Индекс сортируется, и данные хранятся в нескольких файлах паркета с данными за один день в каждом файле. Использую dask 2.9.1

При загрузке данных из одного паркетного файла деление выставляется правильно.

Когда я загружаю данные из нескольких файлов, я не получаю изменений в итоговом фрейме данных dask.

Пример ниже иллюстрирует проблему:

import pandas as pd 
import pandas.util.testing as tm
import dask.dataframe as dd

df = tm.makeTimeDataFrame( 48, "H")

df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" ) 
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet"  ) 
print(ddf.npartitions, ddf.divisions)

Здесь я получаю 2 раздела и (None, None, None) как подразделения

Могу ли я получить dd.read_parquet для установки фактических значений разделов?


Обновлять

По моим фактическим данным у меня есть одна паркетная папка в день.

Файлы создаются путем сохранения данных из фрейма данных, в котором метка времени используется в качестве индекса. Индекс отсортирован. Размер каждого файла составляет 100–150 МБ, и при загрузке в память он использует приложение 2,5 ГБ ОЗУ, активация индекса важна, поскольку воссоздание индекса очень сложно.

Мне не удалось найти комбинацию параметров или движка на read_parquet, которые заставляют его создавать разделение при загрузке.

Файлы данных называются "yyyy-mm-dd.parquet", поэтому я решил создать разделы на основе этой информации:

from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [  pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions

Это не позволило использовать индекс, а в некоторых случаях не удалось с "TypeError: может только объединить кортеж (не" список ") в кортеж"

Затем я попытался установить деления как кортеж ddf.divisions = tuple(divisions), и тогда это сработало. При правильной настройке индекса dask работает впечатляюще быстро


Обновление 2

Лучший способ - прочитать кадры данных dask по отдельности, а затем объединить их:

from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)

Таким образом задаются деления, а также решается другая проблема обработки добавления столбцов с течением времени.


person PerJensen    schedule 02.01.2020    source источник
comment
Каковы значения разделений для двух дел по одному делу: перекрываются ли они?   -  person mdurant    schedule 06.01.2020
comment
В делениях не должно быть перекрытий. Я обновил вопрос рабочим решением, но могут быть решения получше   -  person PerJensen    schedule 09.01.2020


Ответы (1)


Ниже я переписал исходный вопрос, чтобы использовать concat, что решило мою проблему.

import pandas as pd 
import pandas.util.testing as tm
import dask.dataframe as dd

# create two example parquet files
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq" ) 
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq" )

# read the files and concatenate
ddf = dd.concat([dd.read_parquet( d ) for d in ["df1d.parq", "df2d.parq"] ], axis=0)

print(ddf.npartitions, ddf.divisions)

Я все еще получаю ожидаемые 2 раздела, но теперь деления (Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-02 23:00:00'))

person PerJensen    schedule 13.01.2020