Эта проблема
Я пытаюсь отправить объект CPython размером 2 ГБ только для чтения (можно мариновать) распределенным рабочим dask через apply()
. Это приводит к потреблению большого количества памяти для процессов / потоков (14+ ГБ).
Есть ли способ загрузить объект в память только один раз и одновременно использовать его рабочими?
Подробнее о проблеме
У меня есть 2 серии Dask Source_list и Pattern_list, содержащие 7 миллионов и 3 миллиона строк соответственно. Я пытаюсь найти все совпадения подстрок в Source_list (7M) из Pattern_list (3M).
Чтобы ускорить поиск подстроки, я использую пакет pyahocorasick для создания структуры данных Cpython. (объект класса) из Pattern_list (объект можно мариновать).
Вещи, которые я пробовал
- работа с одним планировщиком dask занимает около 2,5 часов, но завершается с правильными результатами.
- запуск с распределенным dask обычно приводит к:
distributed.worker - WARNING - Memory use is high but worker has no data to
store to disk. Perhaps some other process is leaking memory? Process memory:
2.85 GB -- Worker memory limit: 3.00 GB
работает с распределенным dask с ограничением памяти, увеличенным до 8 ГБ / 16 ГБ:
Темы
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 14.5 GB -- Worker memory limit: 16.00 GB distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
Процессы. На обработку уходит более 2,5 часов, и я никогда не видел, чтобы она завершалась (оставил работающим более 8 часов, прежде чем отменить). Он также потребляет более 10 ГБ памяти.
- Использование векторизованной строковой операции
Source_list.str.find_all(Pattern_list)
занимает более 2,5 часов. - Сохранение объекта в глобальной переменной и его вызов приводит к той же ошибке, что и в пункте 3 для процессов и потоков.
- Использование map_partitions + loop / map в Source_list дает те же результаты, что и пункт 3.
Распределенный код Dask
# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1
import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress
def create_ahocorasick_trie(pattern_list):
A = ahocorasick.Automaton()
for index, item in pattern_list.iteritems():
A.add_word(item,item)
A.make_automaton()
return A
if __name__ == '__main__':
client = Client(memory_limit="12GB",processes=False)
# Using Threading, because, the large_object seems to get copied in memory
# for each process when processes = True
Source_list = dd.read_parquet("source_list.parquet")
Pattern_list = dd.read_parquet("pattern_list.parquet")
# Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask
large_object = create_ahocorasick_trie(Pattern_list)
result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))
# iter() is an ahocorasick Cpython method
progress(result.head(10))
client.close()