Dask использует большое количество памяти при вычислении двух значений с общей зависимостью

Я использую Dask на одной машине (LocalCluster с 4 процессами, 16 потоками, 68,56 ГБ памяти), и у меня возникают проблемы с рабочей памятью при попытке вычислить сразу два результата, которые имеют общую зависимость.

В примере, показанном ниже, вычисление result с помощью всего одного вычисления выполняется нормально и быстро, при этом максимальное использование памяти рабочими процессами составляет около 1 ГБ. Однако при вычислении results с двумя вычислениями рабочие быстро используют всю свою память и начинают запись на диск, когда общее использование памяти составляет около 40 ГБ. Вычисления в конечном итоге завершатся, но после начала записи на диск произойдет резкое замедление, как и следовало ожидать.

Интуитивно понятно, что если один фрагмент считывается, а затем сразу вычисляются две его суммы, то этот фрагмент можно отбросить, а использование памяти останется низким. Однако похоже, что Даск отдает приоритет загрузке данных, а не более поздним агрегированным вычислениям, которые очищают память.

Любая помощь в понимании того, что здесь происходит, будет принята с благодарностью. Как я могу вычислить два результата с общей зависимостью без необходимости дважды читать базовые данные или полностью считывать их в память?

import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client

client = Client("localhost:8786")

array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])

# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())

# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])

person user73445    schedule 27.02.2021    source источник


Ответы (1)


Способ построения массива: каждый раз, когда создается чанк, он должен генерировать каждый столбец массива. Таким образом, одна из возможностей оптимизации (если возможно) - это сгенерировать / загрузить массив таким образом, чтобы разрешить обработку по столбцам. Это снизит нагрузку на память при выполнении одной задачи.

Еще одно место для оптимизации - явное указание общих зависимостей, например, dask.compute(df[['0', '1']].sum()) будет работать эффективно.

Однако более важным моментом является то, что по умолчанию dask следует некоторым практическим правилам о том, как расставлять приоритеты в работе, см. здесь. У вас есть несколько вариантов вмешательства (не уверен, является ли этот список исчерпывающим): настраиваемые приоритеты, ограничения ресурсов, изменение графа вычислений (чтобы позволить рабочим освобождать память от промежуточных задач, не дожидаясь завершения последней задачи).

Простой способ изменить график - разбить зависимость между итоговой суммой и всеми промежуточными задачами путем вычисления промежуточных сумм вручную:

[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])

Обратите внимание, что results будет списком из двух подсписок, но вычислить сумму каждого подсписка тривиально (попытка запустить sum на отложенном объекте вызовет вычисление, поэтому более эффективно запускать sum после вычисления results).

person SultanOrazbayev    schedule 27.02.2021
comment
Огромное спасибо за помощь! Ваши предложения привели меня к решению моей проблемы. Мои фактические вычисления были немного более сложными, чем суммирование, и я применял функцию к каждому элементу списка отложенных pd.DataFrame объектов, используя встроенную функцию map. Как и в случае с методом sum в моем примере выше, похоже, что функция map препятствовала эффективному разделению задачи. После переключения с функции map на цикл for или эквивалентный список, система смогла правильно разделить задачу и обработать вычисления, не увеличивая объем памяти. - person user73445; 06.03.2021
comment
Замечательно! Используя .visualize(), можно будет увидеть, есть ли дополнительные преимущества в оптимизации рабочего процесса. - person SultanOrazbayev; 06.03.2021