Я использую Dask на одной машине (LocalCluster
с 4 процессами, 16 потоками, 68,56 ГБ памяти), и у меня возникают проблемы с рабочей памятью при попытке вычислить сразу два результата, которые имеют общую зависимость.
В примере, показанном ниже, вычисление result
с помощью всего одного вычисления выполняется нормально и быстро, при этом максимальное использование памяти рабочими процессами составляет около 1 ГБ. Однако при вычислении results
с двумя вычислениями рабочие быстро используют всю свою память и начинают запись на диск, когда общее использование памяти составляет около 40 ГБ. Вычисления в конечном итоге завершатся, но после начала записи на диск произойдет резкое замедление, как и следовало ожидать.
Интуитивно понятно, что если один фрагмент считывается, а затем сразу вычисляются две его суммы, то этот фрагмент можно отбросить, а использование памяти останется низким. Однако похоже, что Даск отдает приоритет загрузке данных, а не более поздним агрегированным вычислениям, которые очищают память.
Любая помощь в понимании того, что здесь происходит, будет принята с благодарностью. Как я могу вычислить два результата с общей зависимостью без необходимости дважды читать базовые данные или полностью считывать их в память?
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
client = Client("localhost:8786")
array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])
# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())
# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])