os.scandir и многопроцессорность - ThreadPool работает, но многопроцессорный пул - нет

У меня есть задача в скрипте Python, который раньше был в основном привязан к вводу-выводу, поэтому я использовал ThreadPools, и все работало нормально. Теперь моя задача становится все более связанной с процессором, поэтому я хотел переключиться на пулы с несколькими процессами.

Я думал, что оба интерфейса ведут себя практически одинаково, поэтому я просто переключил импорт, и все готово. Однако внезапно моя рабочая функция больше не выполняется в пуле.

Попробовав пару вещей, это похоже на тот факт, что я передаю DirEntry из os.scandir () моей рабочей функции. При замене «записи» жестко запрограммированной строкой моя рабочая функция выполняется. Заменив обратно на вход, перестает работать. Заменив импорт на ThreadPool, он снова работает.

# This works.
from multiprocessing.pool import ThreadPool as Pool
import os

pool_size = 3

def worker(entry):
    print("Did some useful stuff!")

pool = Pool(pool_size)

for entry in os.scandir("Samples/"):
    if entry.is_file():
        pool.apply_async(worker, (entry,))

pool.close()
pool.join()

print("Finished multiprocessing task.")

Вывод:

Did some useful stuff! (~150x)
Finished multiprocessing task.

Замените from multiprocessing.pool import ThreadPool as Pool на from multiprocessing import Pool, теперь я получаю только следующий результат:

Finished multiprocessing task.

Теперь, если я вставлю случайную строку вместо записи из цикла в pool.apply_async(worker, (entry,)), например, pool.apply_async(worker, ("Why does this work?",)), рабочая функция работает и возвращает тот же результат, что и ThreadPools, но, очевидно, с аргументом, который я не хочу использовать в моем фактическом скрипте.

Что тут происходит?


person Nirusu    schedule 09.09.2019    source источник


Ответы (1)


Проблема в том, что все, что передается дочернему процессу, обрабатывается, и это не работает для DirEntry в результате scandir. К сожалению, с apply_async вы не увидите соответствующих сбоев. Вы бы сделали это простым apply, и именно так я отследил это, как только вы увидите, что происходит, это действительно имеет смысл:

TypeError: can't pickle posix.DirEntry objects

В зависимости от того, что вы хотите, вы можете передать entry.path или другой атрибут (ы) (которые можно мариновать, так что на самом деле также name, иначе вам придется использовать возвращаемые значения его методов) DirEntry в свой рабочий, и ваш код должен работать ОК, как есть.


Что касается изучения сбоев, в качестве альтернативы вы можете написать небольшую функцию, например:

def print_failed(caught):
    traceback.print_exc(file=sys.stderr)

И зарегистрируйте его в своем apply_async вызове, добавив: error_callback=print_failed.

person Ondrej K.    schedule 09.09.2019
comment
Ух, это в основном показывает, что пулы потоков и пулы не вполне напрямую заменяются друг другом. Вроде облом, потому что мне понравилась возможность переключаться между именем файла и полным путем с DirEntry объектами. Но да, похоже, мне нужно немного изменить свой код, чтобы работать с атрибутами, а не со всем объектом. Спасибо за вашу помощь! - person Nirusu; 09.09.2019
comment
Да, но они действительно не могут быть одинаковыми, потому что потоки используют одно и то же адресное пространство, и, следовательно, передача контента намного проще. После того, как вы создаете отдельные процессы, передача должна происходить через границы процесса. Если способность переключаться важна, вам нужно придерживаться маринованных вещей, которые подойдут в любом случае. - person Ondrej K.; 09.09.2019