Принудительная или явная перебалансировка данных с помощью dask.distributed

У меня есть кластер Dask-MPI с 4 рабочими, набор данных 3D-сетки, загруженный в массив Dask и разбитый на 4 блока. Мое приложение требует, чтобы я выполнял ровно одну задачу для каждого рабочего, и желательно с одним блоком на задачу. Проблема, с которой я столкнулся, - это надежное и воспроизводимое распределение блоков по кластеру. В частности, если я запускаю array.map_blocks (foo), foo запускается на одном и том же рабочем месте для каждого блока.

Client.rebalance () кажется, что он должен делать то, что я хочу, но он по-прежнему оставляет все или большинство блоков на одном и том же работнике. В качестве теста я попытался повторно разбить данные на 128 блоков и запустить снова, в результате чего 7 или 8 блоков переместились в другой набор данных. Это намекает на то, что Dask использует эвристику, чтобы решить, когда автоматически перемещать блоки, но не дает мне возможности принудительно распределить блоки равномерно.

Вот простой тестовый сценарий, который я пробовал (подключение к кластеру с 4 рабочими / рангами).

#connect to the Dask scheduler
from dask.distributed import Client, Sub, Pub, fire_and_forget
client = Client(scheduler_file='../scheduler.json', set_as_default=True)


#load data into a numpy array
import numpy as np
npvol = np.array(np.fromfile('/home/nleaf/data/RegGrid/Vorts_t50_128x128x128_f32.raw', dtype=np.float32))
npvol = npvol.reshape([128,128,128])

#convert numpy array to a dask array
import dask.array as da
ar = da.from_array(npvol).rechunk([npvol.shape[0], npvol.shape[1], npvol.shape[2]/N])


def test(ar):
    from mpi4py import MPI
    rank = MPI.COMM_WORLD.Get_rank()
    return np.array([rank], ndmin=3, dtype=np.int)

client.rebalance()
print(client.persist(ar.map_blocks(test, chunks=(1,1,1))).compute())

За несколько десятков тестовых прогонов этот код один раз вернул блок с рангом 3, а в противном случае все блоки были с рангом 0.


person nleaf    schedule 12.07.2019    source источник


Ответы (1)


Поскольку ваш общий набор данных не такой большой, первоначальный вызов from_array создает только один фрагмент, поэтому он переходит к одному рабочему (вы могли бы указать иное с помощью chunks=). Следующий повторный блок предпочитает не перемещать данные, если это возможно.

Предполагая, что к вашему файлу может получить доступ каждый из воркеров, вам лучше в первую очередь загружать чанки в воркеры.

Вам понадобятся такие функции, как

def get_chunk(fn, offset, count, shape, dtype):
    with open(fn, 'rb') as f:
        f.seek(offset)
        return np.fromfile(f, dtype=dtype, count=count).reshape(shape)

и передача разных смещений для каждого фрагмента.

parts = [da.from_delayed(dask.delayed(get_chunk)(fn, offset, count, shape, dtype), shape, dtype) for offset in [...]]
arr = da.concat(parts)

Это очень похоже на то, что автоматически делает npy источник во Intake, код: https://github.com/intake/intake/blob/master/intake/source/npy.py#L11

person mdurant    schedule 17.07.2019
comment
Спасибо! Я нашел другой источник, который указал, что client.rebalance() ничего не делает до тех пор, пока не будут загружены некоторые данные. Итак, я считаю, что правильная последовательность должна заключаться в загрузке с использованием вашего кода, тогда client.persist(arr); wait(arr); client.rebalance() Поскольку параллельная загрузка - это другая часть уравнения, которое мне не хватало, я принимаю ваш ответ. - person nleaf; 25.07.2019