Я работаю с большим набором данных (220 000 000 строк, ~ 25 ГБ в виде файлов csv), который хранится в виде нескольких файлов csv.
Мне уже удалось прочитать эти csv с помощью Dask и сохранить данные в виде паркетного файла со следующим:
import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
client = Client()
init_fields = {
# definition of csv fields
}
raw_data_paths = [
# filenames with their path
]
read_csv_kwargs = dict(
sep=";",
header=None,
names=list(init_fields.keys()),
dtype=init_fields,
parse_dates=['date'],
)
ddf = dd.read_csv(
raw_data_paths,
**read_csv_kwargs,
)
ddf.to_parquet(persist_path / 'raw_data.parquet')
Он работает как амулет и завершается в считанные минуты. У меня есть паркетный файл, содержащий Dask Dataframe с 455 разделами, который я могу полностью использовать.
Однако этот фреймворк состоит из огромного списка клиентских заказов, которые я хотел бы проиндексировать по дате для дальнейшей обработки.
Когда я пытаюсь запустить код с настройкой ниже:
ddf = dd.read_csv(
raw_data_paths,
**read_csv_kwargs,
).set_index('date')
ddf.to_parquet(persist_path / 'raw_data.parquet')
обработка становится очень долгой, с более чем 26 000 задач (я понимаю, это много данных для сортировки), но через некоторое время рабочие начинают умирать из-за использования большого количества памяти.
С каждой смертью рабочего теряется некоторый прогресс, и кажется, что обработка никогда не завершится.
Я заметил, что смерть рабочих связана с тем, что диск моей машины достигает предела, и всякий раз, когда рабочий умирает, освобождается какое-то место. На момент начала обработки у меня около 37 Гб свободного места на диске.
Я новичок в Dask, поэтому у меня есть несколько вопросов по этому поводу:
- Хорошая ли идея - установить индекс перед сбросом в паркетный файл? У меня есть несколько групповых дат для следующих шагов, и, согласно документации Dask, использование этого поля в качестве индекса показалось мне хорошей идеей.
- Если мне удастся установить индекс перед выгрузкой как паркетный файл, будет ли паркетный файл отсортирован, и моя дальнейшая обработка не потребует более перетасовки?
- Кажется ли вышеописанное поведение (чрезмерное использование диска из-за ошибки памяти) нормальным или есть что-то странное в моей настройке или использовании Dask? Есть ли какие-то параметры, которые я могу настроить?
- Или мне действительно нужно больше места на диске, потому что этого требует сортировка большого количества данных? Какова будет оценка общего необходимого дискового пространства?
Заранее спасибо!
РЕДАКТИРОВАТЬ: мне наконец удалось установить индекс:
- добавление дискового пространства на моей машине
- настройка параметров клиента, чтобы у каждого рабочего было больше памяти
Я использовал следующие параметры:
client = Client(
n_workers=1,
threads_per_worker=8,
processes=True,
memory_limit='31GB'
)
Я менее уверен в том, что использование диска было основной причиной смерти моих рабочих из-за нехватки памяти, потому что увеличение дискового пространства само по себе не позволило завершить обработку. Также требовалось, чтобы память на каждого рабочего была расширена, чего я добился, создав одного рабочего со всей памятью моей машины.
Однако я очень удивлен, что потребовалось столько памяти. Я думал, что одной из целей Dask (и других инструментов для работы с большими данными) было обеспечение обработки вне ядра. Я что-то делаю здесь не так или для установки индекса требуется большой объем памяти, несмотря ни на что?
С уважением,