Как передать 2d-массив как multiprocessing.Array в multiprocessing.Pool?

Моя цель - передать родительский массив mp.Pool и заполнить его 2, распределяя его по разным процессам. Это работает для массивов 1 измерения:

import numpy as np
import multiprocessing as mp
import itertools


def worker_function(i=None):
    global arr
    val = 2
    arr[i] = val
    print(arr[:])


def init_arr(arr=None):
    globals()['arr'] = arr

def main():
    arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
    mp.Pool(1, initializer=init_arr, initargs=(arr,)).starmap(worker_function, zip(range(5)))
    print(arr[:])


if __name__ == '__main__':
    main()

Выход:

[2, 0, 0, 0, 0]
[2, 2, 0, 0, 0]
[2, 2, 2, 0, 0]
[2, 2, 2, 2, 0]
[2, 2, 2, 2, 2]
[2, 2, 2, 2, 2]

Но как я могу сделать то же самое для x-мерных массивов? Добавление измерения к arr:

arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)

выдает ошибку:

Traceback (most recent call last):
  File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 23, in <module>
    main()
  File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 17, in main
    arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)
  File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\context.py", line 141, in Array
    ctx=self.get_context())
  File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 88, in Array
    obj = RawArray(typecode_or_type, size_or_initializer)
  File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 67, in RawArray
    result.__init__(*size_or_initializer)
TypeError: only size-1 arrays can be converted to Python scalars

Смена dtype на arr тоже не помогает.


person Artur Müller Romanov    schedule 06.10.2020    source источник


Ответы (1)


Вы не можете напрямую использовать multiprocessing.Array в качестве двумерного массив, но в одномерной памяти второе измерение все равно всего лишь иллюзия :).

К счастью, numpy позволяет читать массив из буфера и изменять его без необходимость его копирования. В приведенной ниже демонстрации я просто использую отдельный замок, чтобы мы могли наблюдать изменения, сделанные шаг за шагом, в настоящее время нет условий гонки для того, что он делает.

import multiprocessing as mp
import numpy as np    

def worker_function(i):
    global arr, arr_lock
    val = 2
    with arr_lock:
        arr[i, :i+1] = val
        print(f"{mp.current_process().name}\n{arr[:]}")


def init_arr(arr, arr_lock=None):
    globals()['arr'] = np.frombuffer(arr, dtype='int32').reshape(5, 5)
    globals()['arr_lock'] = arr_lock


def main():
    arr = mp.Array('i', np.zeros(5 * 5, dtype='int32'), lock=False)
    arr_lock = mp.Lock()

    mp.Pool(2, initializer=init_arr, initargs=(arr, arr_lock)).map(
        worker_function, range(5)
    )

    arr = np.frombuffer(arr, dtype='int32').reshape(5, 5)
    print(f"{mp.current_process().name}\n{arr}")


if __name__ == '__main__':
    main()

Выход:

ForkPoolWorker-1
[[2 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
 [2 2 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [2 2 2 2 0]
 [0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [2 2 2 2 0]
 [2 2 2 2 2]]
MainProcess
[[2 0 0 0 0]
 [2 2 0 0 0]
 [2 2 2 0 0]
 [2 2 2 2 0]
 [2 2 2 2 2]]

Process finished with exit code 0
person Darkonaut    schedule 06.10.2020