Проблема в apply_async в многопроцессорном пуле

Я использую многопроцессорный пул в Python и его метод .apply_async() для одновременного запуска нескольких рабочих процессов.

Но есть проблема из-за использования with вместо создания произвольного экземпляра.

Вот что я сделал до сих пор:


Фрагмент кода общего раздела:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

def worker(x):
    print(f"{x} started.")
    sleep(x)
    print(f"{x} finished.")
    return f"{x} finished."

result_list = []
def log_result(result):
    result_list.append(result)

Первый фрагмент кода, который хорошо работает в стиле Python 2:

tick = time()

pool = Pool()
for i in range(6):
    pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

Вне:

1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5

Второй фрагмент кода, который хорошо работает в стиле Python 3:

tick = time()

with ProcessPoolExecutor() as executor:
    for i in range(6):
        executor.submit(worker, i)

print('Total elapsed time: ', time() - tick)
print(i)  # Indicates that all iteration has been done.

Вне:

0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.017550945281982
5

Дополнительный:

  • Можно сделать вывод, что метод Python 3 быстрее, чем метод Python 2.

Вопрос:

Теперь проблема заключается в том, что я хочу реализовать метод Python 2, используя with, такой как метод Python 3, но задачи не выполнены:

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

Вне:

Total elapsed time:  0.10628008842468262
[]
5

Однако, если я поставлю sleep(1) после pool.apply_async(...), некоторые облегченные задачи будут завершены (установление блока):

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)
        sleep(1)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

Вне:

0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time:  6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5

Что я пропустил?


person Benyamin Jafari    schedule 13.01.2020    source источник


Ответы (1)


concurrent.futures.Executor и multiprocessing.Pool имеют две совершенно разные реализации контекстного менеджера.

concurrent.futures.Executor вызывает shutdown(wait=True), фактически ожидая завершения всех поставленных в очередь заданий в соответствии с документация.

Вы можете избежать явного вызова этого метода, если используете оператор with, который отключит Executor (ожидание, как если бы Executor.shutdown() был вызван с ожиданием, установленным на True)

multiprocessing.Pool вызывает terminate вместо close, а затем join, что приводит к преждевременному прерыванию всех текущих заданий. В документации.

Объекты пула теперь поддерживают протокол управления контекстом — см. Типы диспетчера контекста. enter() возвращает объект пула, а exit() вызывает метод terminate().

Если вы хотите использовать multiprocessing.Pool вместе с его контекстным менеджером, вам нужно дождаться результатов самостоятельно.

with Pool() as pool:
    async_result = pool.apply_async(worker, args=(i,), callback=log_result)
    async_result.wait()
person noxdafox    schedule 13.01.2020