ValueError: невозможно переиндексировать с повторяющейся оси с использованием Dask DataFrame

Я пытался адаптировать свой код для использования Dask для использования нескольких машин для обработки. Хотя начальная загрузка данных не требует много времени, последующая обработка занимает около 12 часов на 8-ядерном i5. Это не идеально, и было решено, что использование Dask для распределения обработки между машинами будет полезным. Следующий код отлично работает со стандартным подходом Pandas:

import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype("str")

artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip()
)

Преобразование в Dask показалось простым, но по ходу дела у меня возникают проблемы. Следующий адаптированный код Dask выдает ValueError: cannot reindex from a duplicate axis ошибку:

import dask.dataframe as dd
from dask.distributed import Client

artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip().compute()
)

if __name__ == '__main__':
    client = Client()

Лучшее, что я могу понять, это то, что Dask не позволяет переназначить существующий Dask DataFrame. Итак, это работает:

...
artists_new = artists["name"].astype("str").compute()
...

Однако я действительно не хочу каждый раз создавать новый DataFrame. Я бы предпочел заменить существующий DataFrame новым, главным образом потому, что у меня есть несколько этапов очистки данных перед обработкой.

Хотя учебник и руководства полезны, они довольно простые и не охватывают такие варианты использования.

Каковы предпочтительные подходы к Dask DataFrames?


person SimplicityGuy    schedule 12.06.2021    source источник


Ответы (1)


Каждый раз, когда вы вызываете .compute() в фрейме данных / серии Dask, он преобразует их в pandas. Итак, что происходит в этой строке

художники [имя] = художники [имя] .astype (str) .compute ()

заключается в том, что вы вычисляете строковый столбец, а затем назначаете серию pandas серии dask (без обеспечения выравнивания разделов). Решение состоит в том, чтобы вызывать .compute() только для окончательного результата, в то время как промежуточные шаги могут использовать обычный синтаксис pandas:

# modified example (.compute is removed)
artists["name"] = artists["name"].astype(str).str.lower()
person SultanOrazbayev    schedule 13.06.2021
comment
Отлично, спасибо, что указали на это. Я прошел мимо этого, теперь перейдя к другим отличиям. Это очень помогает, еще раз спасибо. - person SimplicityGuy; 13.06.2021
comment
Пожалуйста! - person SultanOrazbayev; 13.06.2021