Многопроцессорный общий объект Python3

Я столкнулся с проблемой синхронизации для общего объекта при использовании модуля multiprocessing в Python 3.2.3 (в Debian 7.5). Я собрал этот простой пример, чтобы проиллюстрировать проблему, его функциональность аналогична multiprocessing.Pool.map (самый простой из возможных вариантов). Я использую multiprocessing.Manager, поскольку мой исходный код использует его (синхронизация по сети). Но поведение будет таким же, если я использую простой multiprocessing.Value в качестве переменной счетчика.

import os as os
import sys as sys
import multiprocessing as mp

def mp_map(function, obj_list, num_workers):
    """ 
    """
    mang = mp.Manager()
    jobq = mang.Queue()
    resq = mang.Queue()
    counter = mp.Value('i', num_workers, lock=True)
    finished = mang.Event()
    processes = []
    try:
        for i in range(num_workers):
            p = mp.Process(target=_parallel_execute, kwargs={'execfun':function, 'jobq':jobq, 'resq':resq, 'counter':counter, 'finished':finished})
            p.start()
            p.join(0)
            processes.append(p)
        for item in obj_list:
            jobq.put(item)
        for i in range(len(processes)):
            jobq.put('SENTINEL')
        finished.wait()
        for p in processes:
            if p.is_alive():
                p.join(1)
                p.terminate()
    except Exception as e:
        for p in processes:
            p.terminate()
        raise e
    results = []
    for item in iter(resq.get, 'DONE'):
        results.append(item)
    return results

def _parallel_execute(execfun, jobq, resq, counter, finished):
    """
    """
    for item in iter(jobq.get, 'SENTINEL'):
        item = execfun(item)
        resq.put(item)
    counter.value -= 1
    print('C: {}'.format(counter.value))
    if counter.value <= 0:
        resq.put('DONE')
        finished.set()
    return


if __name__ == '__main__':
    l = list(range(50))
    l = mp_map(id, l, 2)
    print('done')
    sys.exit(0)

Выполнение вышеуказанного кода несколько раз приводит к следующему:

wks:~$ python3 mpmap.py 
C: 1
C: 0
done
wks:~$ python3 mpmap.py 
C: 1
C: 0
done
wks:~$ python3 mpmap.py 
C: 1
C: 1
Traceback (most recent call last):
  File "mpmap.py", line 55, in <module>
    l = mp_map(id, l, 2)
  File "mpmap.py", line 25, in mp_map
    finished.wait()
  File "/usr/lib/python3.2/multiprocessing/managers.py", line 1013, in wait
    return self._callmethod('wait', (timeout,))
  File "/usr/lib/python3.2/multiprocessing/managers.py", line 762, in _callmethod
    kind, result = conn.recv()
KeyboardInterrupt

Основываясь на документации модуля multiprocessing, я не понимаю, почему counter небезопасен для процесса, поскольку доступ к нему осуществляется через Manager, и он явно инициализируется с помощью lock=True. Поскольку тупик случается время от времени, я не совсем уверен, как интерпретировать такое поведение. Мы очень ценим любые полезные идеи, спасибо.

РЕДАКТИРОВАТЬ: Просто так получилось, что я нашел объяснение после того, как погуглил еще немного; Я поделюсь этим здесь, если кому-то еще интересно: на основе этой записи в блоге 1, ссылка на которую приведена ниже, блокировка, выполненная в Python (т.е. в multiprocessing.[Manager].Value с lock=True), не приводит к атомарным операциям с общими значениями, как в примере. Решение состоит в том, чтобы использовать другую блокировку, совместно используемую процессами, которая используется для управления доступом к общим объектам.

[http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/]]»rel=


person Community    schedule 28.07.2014    source источник
comment
counter.value -= 1 не является атомарным, несмотря на lock=True. См. Этот ответ: stackoverflow.com/a/1233363/3826372   -  person Ross Ridge    schedule 28.07.2014
comment
Спасибо за замечание и ссылку; однако нашел ответ, погуглив больше (см. мое редактирование)   -  person    schedule 30.07.2014
comment
Да, я знаю. Вы внесли это изменение одновременно с моим комментарием. В любом случае не стоит отвечать на собственные вопросы, редактируя вопрос. Вы должны опубликовать ответ и принять его.   -  person Ross Ridge    schedule 30.07.2014


Ответы (1)


Как посоветовал Росс, я повторяю здесь ответ: вкратце, lock=True для multiprocessing.Value или multiprocessing.Manager.Value не дает, например, приращение (уменьшение) значения атомарной операции - требуется отдельная блокировка для инкапсуляции всей операции; для примера кода см. этот ответ https://stackoverflow.com/a/1233363/3826372 или вышеупомянутую запись в блоге http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

person Community    schedule 31.07.2014