`multiprocessing.Pool.map()` кажется, неправильно расписан

У меня есть функция, которая запрашивает сервер, извлекает некоторые данные, обрабатывает их и сохраняет файл csv. Эта функция должна быть запущена 20k раз. Каждая казнь длится по-разному: иногда она длится более 20 минут, а иногда менее секунды. Я решил пойти с multiprocessing.Pool.map, чтобы распараллелить выполнение. Мой код выглядит так:

def get_data_and_process_it(filename):
    print('getting', filename)
    ...
    print(filename, 'has been process')

with Pool(8) as p:
    p.map(get_data_and_process_it, long_list_of_filenames)

Глядя на то, как генерируются prints, кажется, что long_list_of_filenames он был разделен на 8 частей и присвоен каждому CPU, потому что иногда он просто блокируется за одно 20-минутное выполнение, и никакой другой элемент long_list_of_filenames не обрабатывается за эти 20 минут. Я ожидал, что map будет планировать каждый элемент в ядре процессора в стиле FIFO.

Есть ли лучший подход для моего случая?


comment
В этом случае вы должны установить параметр chunksize для Pool.map() на 1. Вы можете рассчитать сгенерированные иначе размеры фрагментов с помощью calc_chunksize_info() из моего ответа здесь.   -  person Darkonaut    schedule 01.08.2019
comment
map работает аналогично встроенному map для итерируемых объектов. Это означает, что заказ обеспечен. Другими словами, медленный процесс блокирует более быстрые процессы. Если порядок для вас не имеет значения, я предлагаю вместо этого изучить map_async.   -  person Bram Vanroy    schedule 01.08.2019


Ответы (2)


Метод map возвращает значение только после завершения всех операций.

Да и печать из пул-воркера не идеальна. Во-первых, такие файлы, как stdout, используют буферизацию, поэтому между печатью сообщения и его фактическим отображением может пройти разное время. Более того, поскольку все рабочие наследуют один и тот же stdout, их выходные данные будут переплетены и, возможно, даже искажены.

Поэтому я бы предложил вместо этого использовать imap_unordered. Это возвращает итератор, который начнет выдавать результаты, как только они станут доступны. Единственная загвоздка в том, что это возвращает результаты в том порядке, в котором они заканчиваются, а не в том порядке, в котором они начинались.

Ваша рабочая функция (get_data_and_process_it) должна возвращать какой-то индикатор состояния. Например, кортеж из имени файла и результата.

def get_data_and_process_it(filename):
    ...
    if (error):
        return (filename, f'has *failed* bacause of {reason}')
    return (filename, 'has been processed')

Затем вы можете сделать:

with Pool(8) as p:
   for fn, res in p.imap_unordered(get_data_and_process_it, long_list_of_filenames):
       print(fn, res)

Это дает точную информацию о том, когда задание завершается, и, поскольку только родительский процесс записывает в stdout, вывод не становится искаженным.

Кроме того, я бы предложил использовать sys.stdout.reconfigure(line_buffering=True) где-то в начале вашей программы. Это гарантирует, что поток stdout будет очищаться после каждой строки вывода.

person Roland Smith    schedule 02.08.2019

map блокирует, вместо p.map можно использовать p.map_async. map будет ждать завершения всех этих вызовов функций, чтобы мы видели все результаты подряд. map_async выполняет работу в случайном порядке и не ждет завершения текущей задачи перед запуском новой задачи. Это самый быстрый подход. (подробнее) также является потоком SO, в котором подробно обсуждаются map и map_async.

Многопроцессорный класс Pool обрабатывает для нас логику организации очереди. Он идеально подходит для параллельного выполнения заданий по очистке веб-страниц (пример) или действительно любого задания, которое можно разделить и распределить независимо. Если вам нужен больший контроль над очередью или обмен данными между несколькими процессами, вы можете взглянуть на класс Queue(Подробнее).

person j23    schedule 02.08.2019