Почему этот небольшой фрагмент зависает при использовании многопроцессорной обработки с maxtasksperchild, numpy.random.randint и numpy.random.seed?

У меня есть скрипт python, который одновременно случайным образом обрабатывает массивы и изображения numpy. Чтобы иметь правильную случайность внутри порожденных процессов, я передаю случайное семя из основного процесса рабочим для их заполнения.

Когда я использую maxtasksperchild для Pool, мой скрипт зависает после запуска Pool.map несколько раз.

Ниже приведен минимальный фрагмент, который воспроизводит проблему:

# This code stops after multiprocessing.Pool workers are replaced one single time.
# They are replaced due to maxtasksperchild parameter to Pool
from multiprocessing import Pool
import numpy as np

def worker(n):
    # Removing np.random.seed solves the issue
    np.random.seed(1) #any seed value
    return 1234 # trivial return value

# Removing maxtasksperchild solves the issue
ppool = Pool(20 , maxtasksperchild=5)
i=0
while True:
    i += 1
    # Removing np.random.randint(10) or taking it out of the loop solves the issue
    rand = np.random.randint(10)
    l  = [3] # trivial input to ppool.map
    result = ppool.map(worker, l)
    print i,result[0]

это результат

1 1234
2 1234
3 1234
.
.
.
99 1234
100 1234 # at this point workers should've reached maxtasksperchild tasks
101 1234
102 1234
103 1234
104 1234
105 1234
106 1234
107 1234
108 1234
109 1234
110 1234

потом зависает на неопределенный срок.

Я потенциально мог бы заменить numpy.random на random Python и избавиться от проблемы. Однако в моем реальном приложении рабочий процесс будет выполнять пользовательский код (предоставленный в качестве аргумента рабочему процессу), над которым я не контролирую, и хотел бы разрешить использование numpy.random функций в этом пользовательском коде. Поэтому я намеренно хочу засеять глобальный генератор случайных чисел (для каждого процесса независимо).

Это было протестировано с Python 2.7.10, numpy 1.11.0, 1.12.0 и 1.13.0, Ubuntu и OSX.


person MohamedEzz    schedule 11.06.2017    source источник
comment
Не удается воспроизвести на Ideone с пулом из двух процессов. (Ideone не позволил бы мне использовать 20.) Зависят ли ваши результаты от размера пула?   -  person user2357112 supports Monica    schedule 13.06.2017
comment
Может быть... Я только что повторил это, и он зависает для пула с 7+ работниками, но зависает в разное время при каждом запуске. Так выглядит состояние гонки, которое становится более заметным по мере увеличения количества рабочих.   -  person MohamedEzz    schedule 13.06.2017


Ответы (3)


Оказывается, это происходит из-за ошибочного взаимодействия Python между threading.Lock и multiprocessing.

np.random.seed и большинство функций np.random.* используют threading.Lock для обеспечения потокобезопасности. Функция np.random.* генерирует случайное число, а затем обновляет начальное число (распределенное по потокам), поэтому необходима блокировка. См. np.random.seed. и cont0_array (используется np.random.random() и другие).

Теперь, как это вызывает проблему в приведенном выше фрагменте?

В двух словах, фрагмент зависает, потому что состояние threading.Lock наследуется при разветвлении. Таким образом, когда дочерний блок разветвляется в то же время, когда блокируется родитель (в np.random.randint(10)), дочерний блокируется (в np.random.seed).

@njsmith объясняет это в этой проблеме github https://github.com/numpy/numpy/issues/9248#issuecomment-308054786

multiprocessing.Pool порождает фоновый поток для управления рабочими процессами: >https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L170-L173

Он зацикливается в фоновом режиме, вызывая _maintain_pool: github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L366

Если рабочий процесс завершается, например, из-за ограничения maxtasksperchild, то _maintain_pool вызывает _repopulate_pool: ="nofollow noreferrer">https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L240

А затем _repopulate_pool разветвляет несколько новых рабочих процессов, все еще находящихся в этом фоновом потоке: ">https://github.com/python/cpython/blob/aefa7ebf0ff0f73feee7ab24f4cdcb2014d83ee5/Lib/multiprocessing/pool.py#L224

Итак, что происходит, так это то, что в конечном итоге вам не повезло, и в тот же момент, когда ваш основной поток вызывает некоторую функцию np.random и удерживает блокировку, многопроцессорность решает разветвить дочерний элемент, который начинается с уже удерживаемой блокировки np.random, но нить, которая его удерживала, исчезла. Затем дочерний процесс пытается вызвать np.random, что требует блокировки, и поэтому дочерний блокируется.

Простой обходной путь здесь — не использовать fork с многопроцессорной обработкой. Если вы используете методы запуска spawn или forkserver, это должно исчезнуть.

Для правильного исправления .... тьфу. Я думаю, нам нужно зарегистрировать обработчик pthread_atfork перед разветвлением, который берет блокировку np.random перед разветвлением, а затем освобождает ее после этого? И действительно, я думаю, нам нужно сделать это для каждой блокировки в numpy, что требует чего-то вроде сохранения слабого набора каждого объекта RandomState, и _FFTCache также имеет блокировку...

(С положительной стороны, это также дало бы нам возможность повторно инициализировать глобальное случайное состояние в дочернем элементе, что мы действительно должны делать в тех случаях, когда пользователь не запустил его явно.)

person MohamedEzz    schedule 13.06.2017
comment
Я подумал, что это будет одна из тех уродливых проблем с потоками/форками без exec. Fork-without-exec в целом довольно хрупок. - person user2357112 supports Monica; 13.06.2017

Использование numpy.random.seed не является потокобезопасным. numpy.random.seed изменяет значение семени глобально, в то время как, насколько я понимаю, вы пытаетесь изменить семя локально.

См. документацию.

Если действительно то, чего вы пытаетесь достичь, это заполнение генератора в начале каждого работника, следующее решение:

def worker(n):
    # Removing np.random.seed solves the problem                                                               
    randgen = np.random.RandomState(45678) # RandomState, not seed!
    # ...Do something with randgen...                                           
    return 1234 # trivial return value                                                                         
person matteo    schedule 11.06.2017
comment
Спасибо. Однако np.random.RandomState(1) не заполняет генератор случайных чисел. Также не могли бы вы указать мне, где в документах говорится о потокобезопасности numpy.random.seed, я не смог найти - person MohamedEzz; 12.06.2017
comment
Если под безопасностью потоков вы имели в виду, что каждый процесс будет генерировать одни и те же случайные числа... да, именно поэтому я передаю случайное семя из основного процесса каждому рабочему - person MohamedEzz; 12.06.2017
comment
Это зависит от того, чего вы хотите достичь. Если у каждого рабочего есть новый генератор, засеянный 45678, это работает. Насколько я понимаю, randgen теперь эквивалентен numpy.random, засеянному 45678. - person matteo; 12.06.2017
comment
Опять же, насколько я понимаю, numpy.seed изменяет глобальное начальное число каждый раз, когда оно выполняется. Я не уверен, но я думаю, что это означает, что во время выполнения одного потока вы меняете начальное значение, которое повлияет на другие потоки. - person matteo; 12.06.2017
comment
numpy.seed меняет семя глобально. глобально в том смысле, что все последующие вызовы функций numpy.random будут зависеть от этого семени. Но каждый процесс полностью независим друг от друга благодаря управлению процессами в Unix. Каждый процесс имеет свою собственную копию памяти и живет в своем собственном мире, если только сокеты/очереди/..etc явно не используются для IPC. - person MohamedEzz; 12.06.2017
comment
Мои знания о внутренней работе numpy ограничены, извините за это :). Однако, что касается части вопроса, поэтому я намеренно хочу засеять глобальный генератор случайных чисел... мое решение работает в вашем примере. Вы пробовали это в более общем примере? - person matteo; 12.06.2017
comment
Вы имеете в виду, что использование randgen решает эту проблему? это сторонняя библиотека? Как уже упоминалось, у меня есть пользовательский код, работающий внутри рабочих, поэтому мне нужно, чтобы numpy был специально засеян - person MohamedEzz; 12.06.2017
comment
Давайте продолжим обсуждение в чате. - person matteo; 12.06.2017

Делая это полноценным ответом, поскольку он не подходит для комментария.

Немного поиграв, что-то здесь пахнет ошибкой numpy.random. Мне удалось воспроизвести ошибку зависания, и, кроме того, были некоторые другие странные вещи, которые не должны были происходить, например, ручное заполнение генератора не работало.

def rand_seed(rand, i):
    print(i)
    np.random.seed(i)
    print(i)
    print(rand())
def test1():
    with multiprocessing.Pool() as pool:
        [pool.apply_async(rand_seed, (np.random.random_sample, i)).get()
        for i in range(5)]
test1()

имеет вывод

0
0
0.3205032737431185
1
1
0.3205032737431185
2
2
0.3205032737431185
3
3
0.3205032737431185
4
4
0.3205032737431185

С другой стороны, отсутствие передачи np.random.random_sample в качестве аргумента работает просто отлично.

def rand_seed2(i):
    print(i)
    np.random.seed(i)
    print(i)
    print(np.random.random_sample())
def test2():
    with multiprocessing.Pool() as pool:
        [pool.apply_async(rand_seed, (i,)).get()
        for i in range(5)]
test2()

имеет вывод

0
0
0.5488135039273248
1
1
0.417022004702574
2
2
0.43599490214200376
3
3
0.5507979025745755
4
4
0.9670298390136767

Это говорит о том, что за кулисами происходит серьезное дурачество. Хотя не знаю, что еще сказать об этом....

По сути, кажется, что numpy.random.seed изменяет не только переменную «начального состояния», но и саму функцию random_sample.

person brunobeltran0    schedule 12.06.2017
comment
Я не уверен, почему вы передаете random_sample рабочим. Но вроде передает функцию со своим состоянием (RandomState). Я удивлен, что передача random_sample вообще сработала, поскольку это метод экземпляра, который нельзя выбрать и, следовательно, нельзя пройти через очередь (используемую пулом). Но поскольку это класс cython, все может быть по-другому. random_sample определяется здесь github.com/numpy/ numpy/blob/ Это интересная проблема, но я считаю, что она не совсем связана с моей проблемой - person MohamedEzz; 13.06.2017