Создание пакета dask из генератора

Я хотел бы создать dask.Bag (или dask.Array) из списка генераторов. Проблема в том, что генераторы (при оценке) слишком велики для памяти.

delayed_array = [delayed(generator) for generator in list_of_generators]
my_bag = db.from_delayed(delayed_array)

NB list_of_generators именно так - генераторы не израсходованы (пока).

Моя проблема в том, что при создании delayed_array генераторы расходуются, а оперативная память исчерпывается. Есть ли способ поместить эти длинные списки в Bag, не потребляя их предварительно или, по крайней мере, не используя их по частям, чтобы использовать ОЗУ на низком уровне?

NNB Я мог записать генераторы на диск, а затем загрузить файлы в Bag - но я подумал, что смогу использовать dask, чтобы обойти это?


person danodonovan    schedule 14.06.2018    source источник
comment
Я почти уверен, что from_delayed ожидает, что каждая часть будет достаточно маленькой, чтобы поместиться в памяти, поэтому единственный способ обойти это будет (а) разбить генераторы на части с islice, чтобы у вас было в 10 раз больше генераторов, 1/10 размера (и затем измените форму, если необходимо, после построения), или (b) оберните каждый генератор во что-то, что создает массив (или что-то еще компактное) итеративно, чтобы вы могли передавать эти массивы в dask вместо генераторов, или (c) вместо этого используйте диск памяти, как вы предложили. Нет ничего тривиального, так что… надеюсь, у кого-то есть лучшее решение.   -  person abarnert    schedule 14.06.2018
comment
Конечно (c) на самом деле является просто частным случаем (b), поскольку все функции numpy и pandas для чтения файлов делают это, читая файл итеративно и создавая массив / серию / фрейм данных без предварительного создания списка для преобразования, но если вы не хотите использовать диск, вы должны сами написать эту часть итеративного построения (либо путем подачи fromiter, либо явного цикла и добавления строк, или чего-то еще). Но есть еще одна возможность: если у вас достаточно памяти для хранения каждого компактного двоичного файла в памяти, вместо записи на диск вы можете записать в BytesIO, а затем прочитать его.   -  person abarnert    schedule 14.06.2018


Ответы (1)


Приличное подмножество Dask.bag может работать с большими итераторами. Ваше решение почти идеальное, но вам нужно будет предоставить функцию, которая создает ваши генераторы при вызове, а не сами генераторы.

In [1]: import dask.bag as db

In [2]: import dask

In [3]: b = db.from_delayed([dask.delayed(range)(i) for i in [100000000] * 5])

In [4]: b
Out[4]: dask.bag<bag-fro..., npartitions=5>

In [5]: b.take(5)
Out[5]: (0, 1, 2, 3, 4)

In [6]: b.sum()
Out[6]: <dask.bag.core.Item at 0x7f852d8737b8>

In [7]: b.sum().compute()
Out[7]: 24999999750000000

Однако, безусловно, есть способы, которыми это может вас укусить. Некоторые несколько более сложные операции с пакетами dask действительно нуждаются в бетонировании перегородок, что может привести к сбою оперативной памяти.

person MRocklin    schedule 14.06.2018
comment
(извините за задержку с принятием). Да, у меня это сработало - к сожалению, мои генераторы были существенно разной длины (и это работало не очень хорошо), но это не тот вопрос, который я задавал! - person danodonovan; 09.07.2018