Как передать ссылку на очередь функции, управляемой pool.map_async()?

Я хочу, чтобы длительный процесс возвращал свой прогресс по очереди (или что-то подобное), который я буду передавать в диалоговое окно индикатора выполнения. Мне также нужен результат, когда процесс завершен. Тестовый пример здесь не работает с RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Мне удалось заставить это работать, используя отдельные объекты Process (где мне можно передавать ссылку на очередь), но тогда у меня нет пула для управления многими процессами, которые я хочу запустить . Любые советы по лучшему шаблону для этого?


person David    schedule 09.07.2010    source источник
comment
Это не ответ на ваш вопрос, но попробуйте библиотеку execnetcodespeak.net/execnet› для мульти- сопоставления процессов. У встроенного multiprocessing есть некоторые проблемы, которые еще предстоит исправить (см. Трекер Python). Кроме того, его исходный код довольно большой и сложный. Библиотека execnet мне кажется намного лучше, чем multiprocessing.   -  person Andrey Vlasovskikh    schedule 10.07.2010


Ответы (2)


Кажется, работает следующий код:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Обратите внимание, что очередь получается из manager.Queue(), а не multiprocessing.Queue(). Спасибо Алекс за то, что указал мне в этом направлении.

person David    schedule 12.07.2010
comment
+1 и просто небольшое замечание, что ваш вопрос помог мне решить проблему, которая возникла у меня сегодня. Я нашел версию очереди Manager, но мой код не работал, потому что я полагался на файл global. Его нужно передать как параметр, как вы делаете. - person winwaed; 31.01.2011

Создание q глобального работает...:

import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Если вам нужно несколько очередей, например. чтобы не смешивать ход выполнения различных процессов пула, должен работать глобальный список очередей (конечно, каждый процесс тогда должен будет знать, какой индекс в списке использовать, но это нормально для передачи в качестве аргумента ;-).

person Alex Martelli    schedule 10.07.2010
comment
Будет ли это работать, если задача определена в другом модуле или пакете? Пример кода очень упрощен. Реальная программа имеет архитектуру MVC, в которой конвейер производитель-потребитель настроен на несколько ядер (модель), и ему необходимо отправлять обновления о ходе выполнения в графический интерфейс wxPython (представление). - person David; 10.07.2010
comment
@ Дэвид, ты можешь попробовать; если ваш реальный код не работает таким простым способом, вам нужно подняться на ступеньку выше по сложности и выбрать менеджера (который может дать вам прокси для очередей и т. д.). - person Alex Martelli; 10.07.2010
comment
Кажется, это вообще не работает. q никогда ничего не возвращает q.empty() всегда имеет значение True на моей машине. Даже если я увеличу вызов сна до 10 секунд, что должно быть чрезмерным временем для задачи, чтобы поместить несколько сообщений в очередь, q.empty всегда возвращает True. - person David; 11.07.2010
comment
@ Дэвид, под этим ты имеешь в виду код, который я разместил в своем A? Потому что у меня этот код отлично работает на двухъядерном макбуке с OSX 10.5, Python 2.6.5 или 2.7. Какая у вас платформа? - person Alex Martelli; 11.07.2010
comment
Да, я скопировал код, который вы разместили, и хотя результаты возвращаются (я получаю 10 Done в списке), очередь никогда ничего не возвращает. Отладчик показывает, что q всегда возвращает q.empty() == True. Windows 7, АктивПитон 2.6.5.12 - person David; 12.07.2010
comment
@ Дэвид, как я уже сказал, он отлично работает на Mac (нет Windows, чтобы проверить). Что ж, тогда похоже, что менеджер — единственный приемлемый для вас подход. - person Alex Martelli; 12.07.2010
comment
@Алекс Спасибо. Я посмотрю на менеджеров. - person David; 12.07.2010
comment
Есть веская причина, по которой это работает на Mac, а не на Windows. Контекст по умолчанию для создания новых процессов в Windows — spawn, так как Fork недоступен. - person micsthepick; 23.05.2018