Python concurrent.futures с использованием подпроцесса, запускающего несколько скриптов Python

Я хочу запустить несколько скриптов Python одновременно, используя concurrent.futures. Серийная версия моего кода ищет определенный файл python в папке и выполняет его.

import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf

FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern   = "Read.py"

for dir,_,_ in os.walk(start_dir):
    FileList.extend(glob(os.path.join(dir,pattern))) ;

FileList

i=0
for file in FileList:
    dir=os.path.dirname((file))
    dirname1 = os.path.basename(dir) 
    print(dirname1)
    i=i+1
    Str='python '+ file
    print(Str)
    completed_process = subprocess.run(Str)`

для параллельной версии моего кода:

    def Python_callback(future):
    print(future.run_type, future.jid)
    return "One Folder finished executing"

def Python_execute():
    from concurrent.futures import ProcessPoolExecutor as Pool
    args = FileList
    pool = Pool(max_workers=1)
    future = pool.submit(subprocess.call, args, shell=1)
    future.run_type = "run_type"
    future.jid = FileList
    future.add_done_callback(Python_callback)
    print("Python executed")

if __name__ == '__main__':
    import subprocess
    Python_execute()

Проблема в том, что я не уверен, как передать каждый элемент FileList в отдельный процессор.

заранее спасибо за помощь


person Angel_M    schedule 13.03.2018    source источник


Ответы (1)


Наименьшее изменение - использовать submit один раз для каждого элемента, а не один раз для всего списка:

futures = []
for file in FileList:
    future = pool.submit(subprocess.call, file, shell=1)
    future.blah blah
    futures.append(future)

Список futures необходим только в том случае, если вы хотите что-то сделать с фьючерсами - дождаться их завершения, проверить их возвращаемые значения и т. Д.

Между тем вы явно создаете пул с max_workers=1. Неудивительно, что это означает, что вы получите только 1 рабочий дочерний процесс, поэтому он будет ждать завершения одного подпроцесса, прежде чем захватить следующий. Если вы действительно хотите запускать их одновременно, удалите этот max_workers и оставьте его по умолчанию равным одному на ядро ​​(или передайте max_workers=8 или другое число, отличное от 1, если у вас есть веская причина изменить значение по умолчанию).


Пока мы занимаемся этим, есть много способов упростить то, что вы делаете:

  • Вам действительно здесь нужен multiprocessing? Если вам нужно взаимодействовать с каждым подпроцессом, это может быть болезненно делать в одном потоке, но потоки или, возможно, asyncio будут работать так же хорошо, как и процессы здесь.
  • Более того, похоже, что вам на самом деле ничего не нужно, кроме как запустить процесс и дождаться его завершения, и это можно сделать с помощью простого синхронного кода.
  • Почему вы создаете строку и используете shell=1 вместо того, чтобы просто передавать список и не использовать оболочку? Излишнее использование оболочки создает накладные расходы, проблемы с безопасностью и неудобства при отладке.
  • Вам действительно не нужно jid для каждого будущего - это просто список всех ваших строк вызова, который бесполезен. Что может быть более полезным, так это какой-то идентификатор, или код возврата подпроцесса, или… возможно, множество других вещей, но все это можно сделать, прочитав возвращаемое значение subprocess.call или простую оболочку.
  • Вам действительно не нужен обратный вызов. Если вы просто соберете все фьючерсы в список и as_completed его, вы сможете распечатать результаты по мере их упрощения.
  • Если вы выполните оба вышеуказанных действия, у вас не останется ничего, кроме pool.submit внутри цикла, что означает, что вы можете заменить весь цикл на pool.map.
  • Вам редко нужно смешивать os.walk и glob. Когда у вас действительно есть шаблон глобуса, примените fnmatch к списку files из os.walk. Но здесь вы просто ищете конкретное имя файла в каждом каталоге, поэтому на самом деле все, что вам нужно для фильтрации, это file == 'Read.py'.
  • Вы не используете i в своем цикле. Но если вам это действительно нужно, лучше сделать for i, file in enumerate(FileList):, чем делать for file in FileList: и вручную увеличивать i.
person abarnert    schedule 13.03.2018