Как разделить большой объект, доступный только для чтения, между распределенными воркерами Dask

Эта проблема

Я пытаюсь отправить объект CPython размером 2 ГБ только для чтения (можно мариновать) распределенным рабочим dask через apply(). Это приводит к потреблению большого количества памяти для процессов / потоков (14+ ГБ).

Есть ли способ загрузить объект в память только один раз и одновременно использовать его рабочими?

Подробнее о проблеме

У меня есть 2 серии Dask Source_list и Pattern_list, содержащие 7 миллионов и 3 миллиона строк соответственно. Я пытаюсь найти все совпадения подстрок в Source_list (7M) из Pattern_list (3M).

Чтобы ускорить поиск подстроки, я использую пакет pyahocorasick для создания структуры данных Cpython. (объект класса) из Pattern_list (объект можно мариновать).

Вещи, которые я пробовал

  1. работа с одним планировщиком dask занимает около 2,5 часов, но завершается с правильными результатами.
  2. запуск с распределенным dask обычно приводит к:
distributed.worker - WARNING - Memory use is high but worker has no data to 
store to disk. Perhaps some other process is leaking memory? Process memory:  
2.85 GB -- Worker memory limit: 3.00 GB
  1. работает с распределенным dask с ограничением памяти, увеличенным до 8 ГБ / 16 ГБ:

    • Темы

      distributed.worker - WARNING - Memory use is high but worker has no 
      data to  store to disk. Perhaps some other process is leaking 
      memory? 
      Process memory:  14.5 GB -- Worker memory limit: 16.00 GB 
      distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
      
    • Процессы. На обработку уходит более 2,5 часов, и я никогда не видел, чтобы она завершалась (оставил работающим более 8 часов, прежде чем отменить). Он также потребляет более 10 ГБ памяти.

  2. Использование векторизованной строковой операции Source_list.str.find_all(Pattern_list) занимает более 2,5 часов.
  3. Сохранение объекта в глобальной переменной и его вызов приводит к той же ошибке, что и в пункте 3 для процессов и потоков.
  4. Использование map_partitions + loop / map в Source_list дает те же результаты, что и пункт 3.

Распределенный код Dask

# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1

import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress

def create_ahocorasick_trie(pattern_list):
    A = ahocorasick.Automaton()
    for index, item in pattern_list.iteritems():
         A.add_word(item,item)
    A.make_automaton()
    return A 

if __name__ == '__main__':
    client = Client(memory_limit="12GB",processes=False)

    # Using Threading, because, the large_object seems to get copied in memory 
    # for each process when processes = True

    Source_list = dd.read_parquet("source_list.parquet") 
    Pattern_list = dd.read_parquet("pattern_list.parquet")

    # Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask

    large_object = create_ahocorasick_trie(Pattern_list)

    result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))

    # iter() is an ahocorasick Cpython method

    progress(result.head(10))

    client.close()





person Hyperspace    schedule 09.02.2019    source источник


Ответы (1)


Короткий ответ - обернуть его в вызов dask.delayed

big = dask.delayed(big)
df.apply(func, extra=big)

Dask будет перемещать его по мере необходимости и рассматривать как отдельный фрагмент данных. При этом он должен существовать на каждом воркере, поэтому у вас должно быть значительно больше оперативной памяти на одного воркера, чем эта штука занимает. (как минимум в 4 раза больше).

person MRocklin    schedule 20.02.2019
comment
Могу ли я использовать ту же технику для map или map_partitions? Я не вижу аргумента extra= в API. - person spiralarchitect; 28.12.2019
comment
Да, это работает и для этих методов. Я использую термин extra выше в качестве заполнителя для любого аргумента ключевого слова вашей пользовательской функции. - person MRocklin; 29.12.2019