Я пытаюсь отфильтровать Dask DataFrame
, а затем использовать map_partitions
для применения функции к каждому разделу. Функция ожидает панд DataFrame
как минимум с 1 строкой.
Вот код для генерации фиктивных данных в виде pandas
DataFrame
(а затем преобразования в Dask DataFrame
) для MCVE
def create_data(n):
df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
random_integers = np.random.default_rng().choice(14, size=n, replace=False)
df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
return df
df = create_data(n=10)
print(df.head(15))
>>>
store_id A
0 10 0.850730
1 10 0.581119
2 10 0.825802
3 10 0.657797
4 10 0.291961
5 10 0.864984
6 9 0.161334
7 9 0.397162
8 9 0.089300
9 9 0.435914
10 9 0.750741
11 9 0.920625
12 3 0.635727
13 3 0.425270
14 3 0.904043
Структура данных: для каждого store_id
ровно 6 строк.
Теперь я создаю список из некоторого количества store_id
, которые я хочу использовать для фильтрации вышеуказанных данных.
filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
print(filtered_store_ids)
>>> [13, 12, 11, 10, 9, 7]
Затем я конвертирую приведенные выше данные (панды DataFrame
) в dask.dataframe
ddf = dd.from_pandas(df, npartitions=10)
Теперь распечатываю разделы ddf
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=6
Partition Index=9, Number of Rows=6
Это ожидаемо. Каждый раздел состоит из 6 строк и одного (уникального) store_id
. Итак, каждый раздел содержит данные для одного store_id
.
Теперь я фильтрую фрейм данных Dask, используя список store_id
сверху
ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]
Опять печатаю разделы отфильтрованных ddf
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=0
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=0
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=0
Partition Index=9, Number of Rows=6
Это ожидается, поскольку каждый раздел имеет один store_id
, и при фильтрации некоторые разделы будут полностью отфильтрованы, и поэтому они будут содержать нулевые строки.
Итак, теперь я повторно разделю отфильтрованные Dataframe
на Рекомендации по использованию Dask DataFrame
ddf = ddf.repartition(npartitions=len(filtered_store_ids))
print(ddf)
>>>
Dask DataFrame Structure:
store_id A
npartitions=6
0 int64 float64
6 ... ...
... ... ...
48 ... ...
59 ... ...
Dask Name: repartition, 47 tasks
Я ожидал, что эта операция переразметки приведет к созданию непустых разделов одинакового размера. Но теперь, когда я повторно распечатываю разделы, я получаю результат, аналогичный предыдущему (неравные размеры разделов и некоторые пустые разделы), как будто повторного разделения не произошло
for p in range(ddf.npartitions):
print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=12
Partition Index=5, Number of Rows=6
Мой следующий шаг - применить функцию к каждому разделу после фильтрации, но это не сработает, поскольку есть некоторые разделы (pandas DataFrame
s), которые функция не может обработать, поскольку в них отсутствуют строки.
def myadd(df):
assert df.shape[0] > 0
...
return ...
ddf.map_partitions(myadd)
>>> AssertionError Traceback (most recent call last)
.
.
.
AssertionError:
Документация Dask для повторного разбиения на разделы: well- объяснил (то же самое для лучшей практики, которую я привел выше), и это кажется достаточно простым, но после повторного разбиения я все еще получаю некоторые разделы с нулевыми строками, и map_partitions
здесь не сработает. Я уверен, что здесь что-то упустил.
Есть несколько сообщений SO о повторном разделении (1, 2), но они не работают с пустыми разделами.
Вопрос
Есть ли способ гарантировать, что после повторного разбиения все разделы снова будут иметь 6 строк и не будут пустыми? т.е. возможно ли иметь повторно разбитый Dask DataFrame
с одинаковыми (непустыми) разделами?
ИЗМЕНИТЬ
Похоже, что пустые разделы не могут быть обработаны в Dask, на данный момент: проблемы 1, 2. Это может быть связано с проблемой, с которой я столкнулся.