Как запускать вложенные, иерархические пафосные многопроцессорные карты?

Построив значительную часть своего кода на укропной сериализации/мариновании, я также пытаюсь использовать пафосную многопроцессорность для распараллеливания своих вычислений. Пафос это естественное продолжение укропа.

При попытке запустить вложенный

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

внутри другого ProcessingPool().map, то я получаю:

AssertionError: daemonic processes are not allowed to have children

E.g.:

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

урожаи

AssertionError: daemonic processes are not allowed to have children

Я пытался использовать amap(...).get() безуспешно. Это на пафосной 0.2.0.

Каков наилучший способ разрешить вложенное распараллеливание?

Обновить

Я должен быть честным в этом месте, и признаться, что я удалил утверждение "daemonic processes are not allowed to have children" из пафоса. Я также построил что-то, что каскадирует KeyboardInterrupt для рабочих и рабочих из тех... Части решения ниже:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

Кажется, работает с консоли и ноутбука IPython (с кнопкой остановки), но не уверен, что это на 100% правильно во всех крайних случаях.


person Mark Horvath    schedule 15.10.2016    source источник
comment
Я pathos автор. Причина, по которой у вас не может быть процессов, порождающих процессы, заключается в том, что они не умирают должным образом, и у вас есть процессы-зомби, которые в конечном итоге зависнут. Я бы порекомендовал решение @Yoda, так как это типичный случай… один дорогой параллельный блок и несколько легких параллельных битов работы. pathos также имеет ParallelPool, который медленнее, но работает, если вам нужно что-то кроме потоков. Я бы также посоветовал поэкспериментировать с неблокирующими картами, так как блокировка может замедлить вас. Также см.: stackoverflow.com/questions/28203774   -  person Mike McKerns    schedule 28.11.2016
comment
@MikeMcKerns, я начал экспериментировать с кодом разными способами (включая процессы, не связанные с демоном), и в итоге все получилось так, как описано выше. Также включен amap, но по другой причине Ctrl+C не вытащил меня из map. К сожалению, не могу использовать облегченный трюк, так как это уже была более крупная система на момент поиска пафоса (после укропа). Теперь следующая задача - иметь какую-то общую память (чтение и запись всех процессов), что кажется сложным с использованием моего каскадного решения... Кстати, отличный инструмент, спасибо!   -  person Mark Horvath    schedule 29.11.2016
comment
Я не могу представить, какой у вас будет рабочий процесс, когда вы не можете использовать один из других пулов (ThreadingPool или ParallelPool) для обеспечения вложенного параллелизма и потребует иерархии ProcessingPools… но, возможно, у вас есть допустимый вариант использования. Я не думал об этом и был бы не против узнать об этом больше (может быть, в виде билета на странице pathos github). Да, удалив утверждение, вложенные ProcessingPools должны работать. Однако причина утверждения заключается в том, что вложенные порожденные пулы, как правило, живут как зомби. Уничтожение процессов-зомби с использованием их идентификатора задания является обходным путем.   -  person Mike McKerns    schedule 29.11.2016
comment
Просто понимаю ваше первоначальное предложение, извините. ParallelPool на самом деле выглядит идеально! Прямо сейчас код может просто разветвлять новые процессы везде, где ему нужно (после проверки достаточности ресурсов). Я мог бы построить диспетчер как сервер на основе сокетов, который принимал бы на выполнение маринованные задания. Ничего невозможного, просто нужен рефакторинг. Спасибо!   -  person Mark Horvath    schedule 30.11.2016
comment
Хорошо, это здорово. Вам следует ответить на свой вопрос, если вы чувствуете, что нашли лучший ответ, чем тот, что был представлен до сих пор.   -  person Mike McKerns    schedule 30.11.2016


Ответы (1)


Я столкнулся с точно такой же проблемой. В моем случае внутренней операции требовался параллелизм, поэтому я сделал ThreadingPool из ProcessingPool. Вот на вашем примере:

from pathos.multiprocessing import ProcessingPool, ThreadingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ThreadingPool().map(refork, xrange(3))

Вы даже можете иметь еще один слой с другим внешним пулом потоков. В зависимости от вашего случая вы можете инвертировать порядок этих пулов. Однако у вас не может быть процессов процессов. Если это действительно необходимо, см.: https://stackoverflow.com/a/8963618/6522112. Я еще не пробовал это сам, поэтому я не могу подробно рассказать об этом.

person tupui    schedule 28.11.2016
comment
Имеет смысл, к сожалению, в моем случае я не могу сказать, какой уровень будет сильно рассчитываться заранее, и я не могу легко ограничиться двумя уровнями распараллеливания. - person Mark Horvath; 29.11.2016
comment
Решение, которое я дал, похоже, работает, но иногда кажется, что внешний пул зависает навсегда. Я пробовал imap и amap безуспешно. Может быть, @MikeMcKerns сможет это просветить? ParallelPool может помочь? - person tupui; 29.11.2016
comment
По сути, multiprocessing.Pool python не убивает чисто, когда его родитель был убит. multiprocess и, следовательно, pathos имеет ту же проблему, потому что они повторно используют один и тот же код. pathos.pools.ParallelPool не является ответвлением от multiprocessing (вместо этого оно происходит от pp), поэтому он не страдает той же проблемой… однако сериализация слабее (она выполняется путем извлечения исходного кода, а не травления, и не позволяет использовать общие объекты). - person Mike McKerns; 30.11.2016
comment
К вашему сведению: исходное извлечение — dill.source, а обычное травление — dill. - person Mike McKerns; 30.11.2016
comment
Как насчет выполнения pool.terminate() и pool.join() ? Насколько я понял, это убьет ребенка должным образом. Я видел вариант timeout, но нет возможности определить, осталась ли работа. Если есть, можно сделать - person tupui; 30.11.2016
comment
Не знаю, как редактировать комментарий... Так что, если есть возможность определить, осталась ли работа (не видел ее), можно было бы инициировать завершение пула вручную. Но получаем ли мы результаты, если делаем это? Кроме того, я думаю, что проблема, которую я упомянул @MikeMcKerns, была связана с imap внутри imap. Любое понимание? Я посмотрю на pathos.pools.ParallelPool. - person tupui; 30.11.2016
comment
@Y0da: imap и map используют один и тот же код под капотом. По сути, map — это просто imap, который выгружается в список. Асинхронная карта (amap) использует другой базовый объект и, следовательно, может иметь другое поведение, чем map и imap. Обратите внимание, что объект результатов из amap имеет get и должен иметь метод ready для проверки завершения всех заданий. - person Mike McKerns; 30.11.2016
comment
Давайте продолжим обсуждение в чате. - person tupui; 30.11.2016