Dask отложен / массив dask нет ответа

У меня есть распределенная установка кластера dask, и я использовал ее для загрузки и преобразования кучи данных. Работает как шарм.

Я хочу использовать его для параллельной обработки. Вот моя функция

el = 5000
n_using = 26
n_across= 6

mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)

def get_vals(c1, m, el, idx):
    m1 = m[c1,:,:]
    corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
    for c2 in range(c1+1, el):
        corr = np.corrcoef(m1.T, m[c2,:,:].T)
        corr_vals[c2] = corr[idx]
        
    return corr_vals

lazy_get_val = dask.delayed(get_vals, pure=True)

Вот однопроцессорная версия того, что я пытаюсь сделать:

arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)

Работает нормально, но занимает несколько часов. Вот мой способ сделать это в dask:

lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)

Даже если он запускается all_corr[1].compute(), он просто сидит и не отвечает. Когда я прерываю ядро, кажется, что оно застряло в /distributed/utils.py:

~ /.../ lib / python3.6 / site-packages / distribution / utils.py синхронно (цикл, func, * args, ** kwargs)

    249     else:
    250         while not e.is_set():
--> 251             e.wait(10)
    252     if error[0]:
    253         six.reraise(*error[0])

Есть предложения по отладке этого?


Другие вещи:

  • Если я запускаю его с меньшим mat (el = 1000), он работает нормально.
  • Если сделаю el = 5000, зависнет.
  • Если я прерву ядро ​​и снова запущу его с el = 1000, оно зависнет.

person Sid R    schedule 27.06.2018    source источник
comment
Могу я попросить вас предоставить для этого mcve? Вы близко, но я не знаю, какой коврик здесь должен быть. В идеале было бы идеально было бы что-то простое, что потенциальный задающий вопрос мог бы скопировать и скопировать, чтобы воспроизвести вашу проблему.   -  person MRocklin    schedule 27.06.2018
comment
Извините. Я его обновил. Еще раз спасибо за то, что изучили это.   -  person Sid R    schedule 27.06.2018
comment
@MRocklin - Есть идеи по этому поводу? Я переключился на joblib, и он работает нормально, но я бы предпочел использовать dask, так как у меня есть довольно приятный кластер.   -  person Sid R    schedule 28.06.2018


Ответы (1)


После добавления импорта в пример я запустил что-то, и построение графика было очень медленным. Это можно улучшить, избегая размещения массивов numpy непосредственно в отложенных вызовах следующим образом:

# mat = np.random.random((el,n_using,n_across))
# idx = np.tril_indices(n_across*2, -n_across)
mat = dask.delayed(np.random.random)((el,n_using,n_across))
idx = dask.delayed(np.tril_indices)(n_across*2, -n_across)

Или удалив ключевое слово pure=True для dask.delayed (когда вы устанавливаете pure = True, он должен хешировать содержимое всех входных данных, чтобы получить для них уникальный ключ, вы делаете это 5000 раз). Я выяснил это, профилировав ваш код с помощью магии %snakeviz в IPython.

Затем я запустил all_corr[1].compute(), и все было нормально. Затем я запустил all_corr.compute(), и казалось, что работа будет завершена, но не очень быстро. Я подозреваю, что либо ваши задачи слишком малы, так что накладных расходов слишком много, либо ваш код тратит слишком много времени на циклы Python for и поэтому сталкивается с проблемами GIL. Не уверен, что именно.

Следующее, что я бы порекомендовал попробовать, - это использовать планировщик dask.distributed, который лучше справится с проблемой GIL и усугубит проблему накладных расходов. Наблюдение за тем, как это работает, вероятно, поможет изолировать проблему.

person MRocklin    schedule 29.06.2018
comment
Привет, Мэтт - Спасибо за помощь. Создание mat и idx из отложенного случайного вызова решает их проблему, но не мою «настоящую» проблему. Я сгенерировал mat и idx как случайные матрицы для создания mcve, но для меня они получены из другого источника. Я пробовал это в планировщике dask, и, похоже, он отлично работает, думая, что это занимает 20 минут (против 4 минут с использованием joblib.Parallel). Задачи могут быть слишком маленькими - я буду придерживаться joblib. В очередной раз благодарим за помощь. - person Sid R; 09.07.2018