Dask: установка индекса для большого фрейма данных приводит к большому использованию дискового пространства во время обработки

Я работаю с большим набором данных (220 000 000 строк, ~ 25 ГБ в виде файлов csv), который хранится в виде нескольких файлов csv.

Мне уже удалось прочитать эти csv с помощью Dask и сохранить данные в виде паркетного файла со следующим:

import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
client = Client()

init_fields = {
# definition of csv fields
}

raw_data_paths = [
# filenames with their path
]

read_csv_kwargs = dict(
    sep=";",
    header=None,
    names=list(init_fields.keys()),      
    dtype=init_fields, 
    parse_dates=['date'],    
)

ddf = dd.read_csv(
    raw_data_paths,
    **read_csv_kwargs,
)
ddf.to_parquet(persist_path / 'raw_data.parquet')

Он работает как амулет и завершается в считанные минуты. У меня есть паркетный файл, содержащий Dask Dataframe с 455 разделами, который я могу полностью использовать.

Однако этот фреймворк состоит из огромного списка клиентских заказов, которые я хотел бы проиндексировать по дате для дальнейшей обработки.

Когда я пытаюсь запустить код с настройкой ниже:

ddf = dd.read_csv(
    raw_data_paths,
    **read_csv_kwargs,
).set_index('date')
ddf.to_parquet(persist_path / 'raw_data.parquet')

обработка становится очень долгой, с более чем 26 000 задач (я понимаю, это много данных для сортировки), но через некоторое время рабочие начинают умирать из-за использования большого количества памяти.

Панель управления Dask

С каждой смертью рабочего теряется некоторый прогресс, и кажется, что обработка никогда не завершится.

Я заметил, что смерть рабочих связана с тем, что диск моей машины достигает предела, и всякий раз, когда рабочий умирает, освобождается какое-то место. На момент начала обработки у меня около 37 Гб свободного места на диске.

Я новичок в Dask, поэтому у меня есть несколько вопросов по этому поводу:

  • Хорошая ли идея - установить индекс перед сбросом в паркетный файл? У меня есть несколько групповых дат для следующих шагов, и, согласно документации Dask, использование этого поля в качестве индекса показалось мне хорошей идеей.
  • Если мне удастся установить индекс перед выгрузкой как паркетный файл, будет ли паркетный файл отсортирован, и моя дальнейшая обработка не потребует более перетасовки?
  • Кажется ли вышеописанное поведение (чрезмерное использование диска из-за ошибки памяти) нормальным или есть что-то странное в моей настройке или использовании Dask? Есть ли какие-то параметры, которые я могу настроить?
  • Или мне действительно нужно больше места на диске, потому что этого требует сортировка большого количества данных? Какова будет оценка общего необходимого дискового пространства?

Заранее спасибо!

РЕДАКТИРОВАТЬ: мне наконец удалось установить индекс:

  • добавление дискового пространства на моей машине
  • настройка параметров клиента, чтобы у каждого рабочего было больше памяти

Я использовал следующие параметры:

client = Client(
    n_workers=1,
    threads_per_worker=8,
    processes=True,
    memory_limit='31GB'
)

Я менее уверен в том, что использование диска было основной причиной смерти моих рабочих из-за нехватки памяти, потому что увеличение дискового пространства само по себе не позволило завершить обработку. Также требовалось, чтобы память на каждого рабочего была расширена, чего я добился, создав одного рабочего со всей памятью моей машины.

Однако я очень удивлен, что потребовалось столько памяти. Я думал, что одной из целей Dask (и других инструментов для работы с большими данными) было обеспечение обработки вне ядра. Я что-то делаю здесь не так или для установки индекса требуется большой объем памяти, несмотря ни на что?

С уважением,


person Pierre Massé    schedule 02.03.2021    source источник


Ответы (1)


Вот как я понимаю вещи, но я могу упустить некоторые важные моменты.

Давайте начнем с хорошего индексированного набора данных, чтобы получить воспроизводимый пример.

import dask
import dask.dataframe as dd

df = dask.datasets.timeseries(start='2000-01-01', end='2000-01-2', freq='2h', partition_freq='12h')

print(len(df), df.npartitions)
# 12 2

Итак, мы имеем дело с крошечным набором данных, всего 12 строк, разделенных на 2 раздела. Поскольку этот фрейм данных индексирован, слияние с ним будет очень быстрым, потому что dask знает, какие разделы содержат какие (индексные) значения.

%%time
_ = df.merge(df, how='outer', left_index=True, right_index=True).compute()
#CPU times: user 25.7 ms, sys: 4.23 ms, total: 29.9 ms
#Wall time: 27.7 ms

Теперь, если мы попытаемся выполнить слияние в столбце без индекса, dask не будет знать, какой раздел содержит какие значения, поэтому ему придется обмениваться информацией между рабочими и передавать биты данных между рабочими.

%%time
_ = df.merge(df, how='outer', on=['name']).compute()
#CPU times: user 82.3 ms, sys: 8.19 ms, total: 90.4 ms
#Wall time: 85.4 ms

Это может показаться не таким уж большим для этого небольшого набора данных, но сравните это со временем, которое потребовалось бы pandas:

%%time
_ = df.compute().merge(df.compute(), how='outer', on=['name'])
#CPU times: user 18.9 ms, sys: 3.39 ms, total: 22.3 ms
#Wall time: 19.7 ms

Другой способ увидеть это - с помощью групп DAG, сравнить DAG для слияния с индексированными столбцами с DAG для слияния с неиндексированным столбцом. Первый хорошо параллелен:

Объединить с проиндексированным столбцом

Второй (с использованием неиндексированного столбца) намного сложнее:

Объединить с неиндексированным столбцом

Итак, что происходит с ростом размера данных, становится ли и дороже выполнять операции с неиндексированными столбцами. Это особенно верно для столбцов, содержащих много уникальных значений (например, строк). Вы можете поэкспериментировать с увеличением количества секций в построенном выше фрейме данных df, и вы увидите, как неиндексированный случай становится все более сложным, в то время как DAG для индексированных данных остается масштабируемым.

Возвращаясь к вашему конкретному случаю, вы начинаете с неиндексированного фрейма данных, который после индексации будет довольно сложной сущностью. Вы можете увидеть DAG для индексированного фрейма данных с .visualize(), и по опыту я могу предположить, что это будет выглядеть не очень красиво.

Поэтому, когда вы сохраняете на паркет (или инициируете другое вычисление фрейма данных), рабочие начинают перемешивать данные, что быстро съедает память (особенно если есть много столбцов и / или много разделов и / или столбцов много уникальные значения). Когда предел рабочей памяти близок, рабочие начнут сбрасывать данные на диск (если им это разрешено), поэтому вы смогли выполнить свою задачу, увеличив как память, так и доступное дисковое пространство.

В ситуации, когда ни один из этих вариантов невозможен, вам может потребоваться реализовать собственный рабочий процесс, который использует delayed API (или futures для динамических графиков), чтобы этот рабочий процесс использовал некоторую информацию, которая явно не доступна для dask. Например, если исходные файлы csv были разделены по интересующему столбцу, вы можете захотеть обработать эти файлы csv отдельными партиями, а не вставлять их в один фрейм данных dask и затем индексировать.

person SultanOrazbayev    schedule 05.03.2021
comment
Спасибо за Ваш ответ! Мне нравится, как вы объяснили, как индексированные данные полезны для дальнейшей обработки, и моя цель - создать и сохранить индекс. Однако этот ответ не отражает специфики моего вопроса. В моих файлах csv нет специального разделения на разделы, и для упрощения можно также считать, что у меня есть один огромный файл. Значит ли это, что Даск не может обработать такой большой файл, просто дав ему достаточно времени? - person Pierre Massé; 05.03.2021