У меня есть данные временного ряда в кадрах данных со временем в качестве индекса. Индекс сортируется, и данные хранятся в нескольких файлах паркета с данными за один день в каждом файле. Использую dask 2.9.1
При загрузке данных из одного паркетного файла деление выставляется правильно.
Когда я загружаю данные из нескольких файлов, я не получаю изменений в итоговом фрейме данных dask.
Пример ниже иллюстрирует проблему:
import pandas as pd
import pandas.util.testing as tm
import dask.dataframe as dd
df = tm.makeTimeDataFrame( 48, "H")
df1 = df[:24].sort_index()
df2 = df[24:].sort_index()
dd.from_pandas( df1, npartitions=1 ).to_parquet( "df1d.parq", engine="fastparquet" )
dd.from_pandas( df2, npartitions=1 ).to_parquet( "df2d.parq", engine="fastparquet" )
ddf = dd.read_parquet( "df*d.parq", infer_divisions=True, sorted_index=True, engine="fastparquet" )
print(ddf.npartitions, ddf.divisions)
Здесь я получаю 2 раздела и (None, None, None)
как подразделения
Могу ли я получить dd.read_parquet для установки фактических значений разделов?
Обновлять
По моим фактическим данным у меня есть одна паркетная папка в день.
Файлы создаются путем сохранения данных из фрейма данных, в котором метка времени используется в качестве индекса. Индекс отсортирован. Размер каждого файла составляет 100–150 МБ, и при загрузке в память он использует приложение 2,5 ГБ ОЗУ, активация индекса важна, поскольку воссоздание индекса очень сложно.
Мне не удалось найти комбинацию параметров или движка на read_parquet, которые заставляют его создавать разделение при загрузке.
Файлы данных называются "yyyy-mm-dd.parquet", поэтому я решил создать разделы на основе этой информации:
from pathlib import Path
files = list (Path("e:/data").glob("2019-06-*.parquet") )
divisions = [ pd.Timestamp( f.stem) for f in files ] + [ pd.Timestamp( files[-1].stem) + pd.Timedelta(1, unit='D' ) ]
ddf = dd.read_parquet( files )
ddf.divisions = divisions
Это не позволило использовать индекс, а в некоторых случаях не удалось с "TypeError: может только объединить кортеж (не" список ") в кортеж"
Затем я попытался установить деления как кортеж ddf.divisions = tuple(divisions)
, и тогда это сработало. При правильной настройке индекса dask работает впечатляюще быстро
Обновление 2
Лучший способ - прочитать кадры данных dask по отдельности, а затем объединить их:
from pathlib import Path
import dask.dataframe as dd
files = list (Path("e:/data").glob("2019-06-*.parquet") )
ddfs = [ dd.read_parquet( f ) for f in files ]
ddf = dd.concat(ddfs, axis=0)
Таким образом задаются деления, а также решается другая проблема обработки добавления столбцов с течением времени.