Я столкнулся с проблемой синхронизации для общего объекта при использовании модуля multiprocessing
в Python 3.2.3 (в Debian 7.5). Я собрал этот простой пример, чтобы проиллюстрировать проблему, его функциональность аналогична multiprocessing.Pool.map
(самый простой из возможных вариантов). Я использую multiprocessing.Manager
, поскольку мой исходный код использует его (синхронизация по сети). Но поведение будет таким же, если я использую простой multiprocessing.Value
в качестве переменной счетчика.
import os as os
import sys as sys
import multiprocessing as mp
def mp_map(function, obj_list, num_workers):
"""
"""
mang = mp.Manager()
jobq = mang.Queue()
resq = mang.Queue()
counter = mp.Value('i', num_workers, lock=True)
finished = mang.Event()
processes = []
try:
for i in range(num_workers):
p = mp.Process(target=_parallel_execute, kwargs={'execfun':function, 'jobq':jobq, 'resq':resq, 'counter':counter, 'finished':finished})
p.start()
p.join(0)
processes.append(p)
for item in obj_list:
jobq.put(item)
for i in range(len(processes)):
jobq.put('SENTINEL')
finished.wait()
for p in processes:
if p.is_alive():
p.join(1)
p.terminate()
except Exception as e:
for p in processes:
p.terminate()
raise e
results = []
for item in iter(resq.get, 'DONE'):
results.append(item)
return results
def _parallel_execute(execfun, jobq, resq, counter, finished):
"""
"""
for item in iter(jobq.get, 'SENTINEL'):
item = execfun(item)
resq.put(item)
counter.value -= 1
print('C: {}'.format(counter.value))
if counter.value <= 0:
resq.put('DONE')
finished.set()
return
if __name__ == '__main__':
l = list(range(50))
l = mp_map(id, l, 2)
print('done')
sys.exit(0)
Выполнение вышеуказанного кода несколько раз приводит к следующему:
wks:~$ python3 mpmap.py
C: 1
C: 0
done
wks:~$ python3 mpmap.py
C: 1
C: 0
done
wks:~$ python3 mpmap.py
C: 1
C: 1
Traceback (most recent call last):
File "mpmap.py", line 55, in <module>
l = mp_map(id, l, 2)
File "mpmap.py", line 25, in mp_map
finished.wait()
File "/usr/lib/python3.2/multiprocessing/managers.py", line 1013, in wait
return self._callmethod('wait', (timeout,))
File "/usr/lib/python3.2/multiprocessing/managers.py", line 762, in _callmethod
kind, result = conn.recv()
KeyboardInterrupt
Основываясь на документации модуля multiprocessing
, я не понимаю, почему counter
небезопасен для процесса, поскольку доступ к нему осуществляется через Manager
, и он явно инициализируется с помощью lock=True
. Поскольку тупик случается время от времени, я не совсем уверен, как интерпретировать такое поведение. Мы очень ценим любые полезные идеи, спасибо.
РЕДАКТИРОВАТЬ: Просто так получилось, что я нашел объяснение после того, как погуглил еще немного; Я поделюсь этим здесь, если кому-то еще интересно: на основе этой записи в блоге 1, ссылка на которую приведена ниже, блокировка, выполненная в Python (т.е. в multiprocessing.[Manager].Value
с lock=True
), не приводит к атомарным операциям с общими значениями, как в примере. Решение состоит в том, чтобы использовать другую блокировку, совместно используемую процессами, которая используется для управления доступом к общим объектам.
[http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/]]»rel=
counter.value -= 1
не является атомарным, несмотря наlock=True
. См. Этот ответ: stackoverflow.com/a/1233363/3826372 - person Ross Ridge   schedule 28.07.2014