Я пытался адаптировать свой код для использования Dask для использования нескольких машин для обработки. Хотя начальная загрузка данных не требует много времени, последующая обработка занимает около 12 часов на 8-ядерном i5. Это не идеально, и было решено, что использование Dask для распределения обработки между машинами будет полезным. Следующий код отлично работает со стандартным подходом Pandas:
import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype("str")
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip()
)
Преобразование в Dask показалось простым, но по ходу дела у меня возникают проблемы. Следующий адаптированный код Dask выдает ValueError: cannot reindex from a duplicate axis
ошибку:
import dask.dataframe as dd
from dask.distributed import Client
artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")
artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
artists["name"]
.str.encode("ascii", "ignore")
.str.decode("ascii")
.str.lower()
.str.replace("&", " and ", regex=False)
.str.strip().compute()
)
if __name__ == '__main__':
client = Client()
Лучшее, что я могу понять, это то, что Dask не позволяет переназначить существующий Dask DataFrame. Итак, это работает:
...
artists_new = artists["name"].astype("str").compute()
...
Однако я действительно не хочу каждый раз создавать новый DataFrame. Я бы предпочел заменить существующий DataFrame новым, главным образом потому, что у меня есть несколько этапов очистки данных перед обработкой.
Хотя учебник и руководства полезны, они довольно простые и не охватывают такие варианты использования.
Каковы предпочтительные подходы к Dask DataFrames?