Многопроцессорные зомби-процессы Python

У меня есть простая реализация модуля многопроцессорной обработки Python.

if __name__ == '__main__':
jobs = []

while True:
    for i in range(40):
        # fetch one by one from redis queue
        #item = item from redis queue
        p = Process(name='worker '+str(i), target=worker, args=(item,))

        # if p is not running, start p
        if not p.is_alive():
            jobs.append(p)
            p.start()

    for j in jobs:
        j.join()
        jobs.remove(j)


def worker(url_data):
    """worker function"""
    print url_data['link']

Что я ожидаю от этого кода:

  1. запустите в бесконечном цикле, продолжайте ждать очереди Redis.
  2. если очередь Redis не пуста, извлечь элемент.
  3. создать 40 multiprocess.Process, не более не менее
  4. если процесс завершил обработку, запустите новый процесс, чтобы постоянно выполнялось около 40 процессов.

Я читал, что, чтобы избежать зомби-процесса, который должен быть привязан (присоединен) к родителю, я ожидал этого во втором цикле. Но проблема в том, что при запуске он порождает 40 процессов, воркеры заканчивают обработку и входят в состояние зомби, пока все текущие порожденные процессы не завершатся, а затем в следующей итерации "пока True" тот же шаблон продолжается.

Итак, мой вопрос: как я могу избежать зомби-процессов. и запускать новый процесс, как только завершится 1 из 40


person Sachin Upmanyu    schedule 15.06.2015    source источник


Ответы (1)


Для задачи, подобной той, что вы описали, обычно лучше использовать другой подход, используя Pool.

Вы можете сделать так, чтобы основной процесс извлекал данные, а рабочие обрабатывали их.

Следуя примеру Pool из Python Docs

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

Я также предлагаю использовать imap вместо map, так как кажется, что ваша задача может быть асинхронной.

Примерно ваш код будет:

p = Pool(40)

while True:
  items = items from redis queue
  p.imap_unordered(worker, items) #unordered version is faster


def worker(url_data):
  """worker function"""
  print url_data['link']
person Paolo Casciello    schedule 15.06.2015
comment
работает отлично, использование памяти немного выше, чем у процесса, хотя я совсем новичок в python, вы уверены, что мне следует использовать пул вместо процесса - person Sachin Upmanyu; 15.06.2015
comment
Конечно, это зависит от вашей конкретной задачи. Как правило, вы перекладываете работу на пул рабочих. Вам не нужно общаться с работниками напрямую во время их работы. Пулы были созданы для такого типа сценариев. :) - person Paolo Casciello; 15.06.2015