Использование set_index () в Dask Dataframe и запись в паркет вызывает взрыв памяти

У меня есть большой набор файлов Parquet, которые я пытаюсь отсортировать по столбцу. В несжатом виде данные составляют около 14 ГБ, поэтому Dask казался подходящим инструментом для работы. Все, что я делаю с Dask, это:

  1. Чтение паркетных файлов
  2. Сортировка по одному из столбцов (называется другом)
  3. Запись паркетных файлов в отдельный каталог

Я не смогу сделать это без процесса Dask (есть только один, я использую синхронный планировщик), у которого не хватает памяти и его убивают. Это меня удивляет, потому что ни один раздел не имеет размера более ~ 300 мб без сжатия.

Я написал небольшой скрипт для профилирования Dask с постепенно увеличивающимися частями моего набора данных, и я заметил, что потребление памяти Dask масштабируется с размером ввода. Вот сценарий:

import os
import dask
import dask.dataframe as dd
from dask.diagnostics import ResourceProfiler, ProgressBar

def run(input_path, output_path, input_limit):
    dask.config.set(scheduler="synchronous")

    filenames = os.listdir(input_path)
    full_filenames = [os.path.join(input_path, f) for f in filenames]

    rprof = ResourceProfiler()
    with rprof, ProgressBar():
        df = dd.read_parquet(full_filenames[:input_limit])
        df = df.set_index("friend")
        df.to_parquet(output_path)

    rprof.visualize(file_path=f"profiles/input-limit-{input_limit}.html")

Вот диаграммы, созданные вызовом visualize():

Предел ввода = 2

С input_limit = 2

Предел ввода = 4

С input_limit = 4

Предел ввода = 8

С input_limit = 8

Предел ввода = 16

С input_limit = 16

Полный набор данных составляет ~ 50 входных файлов, поэтому при таких темпах роста я не удивлен, что работа съедает всю память на моей машине 32 ГБ.

Насколько я понимаю, вся суть Dask заключается в том, чтобы позволить вам работать с наборами данных размером больше памяти. У меня сложилось впечатление, что люди используют Dask для обработки наборов данных, намного превышающих мои ~ 14 ГБ. Как они могут избежать этой проблемы с масштабированием потребления памяти? Что я здесь делаю не так?

На данный момент меня не интересует использование другого планировщика или параллелизма. Я просто хотел бы знать, почему Dask потребляет намного больше памяти, чем я мог подумать.


person Sinclair    schedule 18.11.2020    source источник


Ответы (1)


Оказалось, что это снижение производительности в Dask, которое было исправлено в версии 2021.03.0.

Дополнительную информацию см. В этой проблеме Github.

person Sinclair    schedule 16.04.2021