Параллельная обработка с ProcessPoolExecutor

У меня есть огромный список элементов, которые нужно как-то обработать. Я знаю, что это можно сделать с помощью Process из многопроцессорности:

pr1 = Process(calculation_function, (args, ))
pr1.start()
pr1.join()

и поэтому я могу создать, скажем, 10 процессов и передать аргументы, разделенные на 10, в args. И тогда работа сделана.

Но я не хочу создавать его вручную и вычислять вручную. Вместо этого я хочу использовать ProcessPoolExecutor, и я делаю это следующим образом:

executor = ProcessPoolExecutor(max_workers=10)
executor.map(calculation, (list_to_process,))

расчет моя функция, которая делает работу.

def calculation(list_to_process):
    for element in list_to_process:
        # .... doing the job

list_to_process — мой список для обработки.

Но вместо этого после запуска этого кода итерация в цикле выполняется только один раз. я думал так

executor = ProcessPoolExecutor(max_workers=10)
executor.map(calculation, (list_to_process,))

то же самое, что и это 10 раз:

pr1 = Process(calculation, (list_to_process, ))
pr1.start()
pr1.join()

Но, похоже, это неправильно.

Как добиться реальной многопроцессорности с помощью ProcessPoolExecutor?


person John    schedule 21.10.2017    source источник


Ответы (1)


Удалите цикл for из функции calculation. Теперь, когда вы используете ProcessPoolExecutor.map, этот вызов map() является вашим циклом, разница в том, что каждый элемент в списке отправляется другому процессу. Например.

def calculation(item):
    print('[pid:%s] performing calculation on %s' % (os.getpid(), item))
    time.sleep(5)
    print('[pid:%s] done!' % os.getpid())
    return item ** 2

executor = ProcessPoolExecutor(max_workers=5)
list_to_process = range(10)
result = executor.map(calculation, list_to_process)

Вы увидите что-то в терминале, например:

[pid:23988] performing calculation on 0
[pid:10360] performing calculation on 1
[pid:13348] performing calculation on 2
[pid:24032] performing calculation on 3
[pid:18028] performing calculation on 4
[pid:23988] done!
[pid:23988] performing calculation on 5
[pid:10360] done!
[pid:13348] done!
[pid:10360] performing calculation on 6
[pid:13348] performing calculation on 7
[pid:18028] done!
[pid:24032] done!
[pid:18028] performing calculation on 8
[pid:24032] performing calculation on 9
[pid:23988] done!
[pid:10360] done!
[pid:13348] done!
[pid:18028] done!
[pid:24032] done!

Хотя порядок событий будет фактически случайным. Возвращаемое значение (по крайней мере, в моей версии Python) на самом деле представляет собой itertools.chain объект по какой-то причине. Но это детали реализации. Вы можете вернуть результат в виде списка, например:

>>> list(result)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

В вашем примере кода вместо этого вы передали одноэлементный кортеж (list_to_process,), поэтому он просто передаст ваш полный список одному процессу.

person Iguananaut    schedule 21.10.2017
comment
Спасибо за ваш ответ! Я не совсем понимаю.. Где должна быть итерация по list_to_process? Поэтому я должен использовать один элемент из моего списка в цикле for - person John; 21.10.2017
comment
@John нигде, executor.map уже перебирает каждый элемент в списке и применяет его в качестве аргумента для функции вычисления - person Yaroslav Surzhikov; 21.10.2017
comment
Как я объяснил, итерация выполняется ProcessPoolExecutor.map(). Это в основном эквивалентно: for item in list_to_process: calculation(item), за исключением того, что calculation может вызываться в процессе различия для каждого элемента. - person Iguananaut; 21.10.2017
comment
Поэкспериментируйте со встроенной функцией map и убедитесь, что вы понимаете как это работает. ProcessPoolExecutor.map делает то же самое, но каждый расчет передается другому процессу, а затем результаты собираются в правильном порядке. - person Iguananaut; 21.10.2017
comment
Что вы подразумеваете под отсутствием итерации? Если все ваши процессы работают, то все они приносят результаты. Если вы хотите получить окончательный результат, вам нужно присвоить возвращаемое значение executor.map переменной. Я думаю, что само возвращаемое значение является итерируемым типом, поэтому вам, возможно, придется обернуть его в list(), чтобы получить фактический объект list. - person Iguananaut; 21.10.2017
comment
Огромное спасибо за это! Мне здорово помогло. Также, заметка для тех, кто в подобной ситуации. Эта карта () с несколькими итерациями, итератор останавливается, когда исчерпана самая короткая итерация. Итак, если у вас есть аргумент, который будет постоянным для всех циклов, вам нужно будет обратиться к этому: stackoverflow.com /a/10834984/2408212 - person Xonshiz; 07.12.2019
comment
Кое-что, что я мог бы также отметить, это то, что в некоторых случаях нормально иметь внутренний цикл в вашей отображаемой функции. При многопроцессорной обработке также необходимо учитывать накладные расходы, связанные с межпроцессным взаимодействием, отправкой входных данных процессам и получением результатов. Иногда может быть более эффективно отправлять аргументы пакетами (где ваш основной процесс обрабатывает пакетную обработку). Однако для этого нет жестких и быстрых правил, и это может потребовать экспериментов. - person Iguananaut; 08.12.2019
comment
Как правило, вы обнаружите, что по мере увеличения количества процессов вы получаете нелинейную эффективность, отчасти из-за этих накладных расходов, но вы можете повысить эффективность при большем количестве процессов с помощью пакетной обработки. Если вы собираетесь выполнять очень крупномасштабные многопроцессорные вычисления, полезно поэкспериментировать и попытаться оптимизировать такие гиперпараметры, как размер пакета. - person Iguananaut; 08.12.2019