У меня есть кластер Dask-MPI с 4 рабочими, набор данных 3D-сетки, загруженный в массив Dask и разбитый на 4 блока. Мое приложение требует, чтобы я выполнял ровно одну задачу для каждого рабочего, и желательно с одним блоком на задачу. Проблема, с которой я столкнулся, - это надежное и воспроизводимое распределение блоков по кластеру. В частности, если я запускаю array.map_blocks (foo), foo запускается на одном и том же рабочем месте для каждого блока.
Client.rebalance () кажется, что он должен делать то, что я хочу, но он по-прежнему оставляет все или большинство блоков на одном и том же работнике. В качестве теста я попытался повторно разбить данные на 128 блоков и запустить снова, в результате чего 7 или 8 блоков переместились в другой набор данных. Это намекает на то, что Dask использует эвристику, чтобы решить, когда автоматически перемещать блоки, но не дает мне возможности принудительно распределить блоки равномерно.
Вот простой тестовый сценарий, который я пробовал (подключение к кластеру с 4 рабочими / рангами).
#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)
#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])
#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])
def test(ar):
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
return np.array([rank], ndmin=3, dtype=np.int)
client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())
За несколько десятков тестовых прогонов этот код один раз вернул блок с рангом 3, а в противном случае все блоки были с рангом 0.