Часто я сталкиваюсь с проблемой, когда Dask случайным образом останавливается на нескольких задачах, обычно связанных с чтением данных с другого узла в моей сети (подробнее об этом ниже). Это может произойти после нескольких часов работы скрипта без проблем. Он будет зависать на неопределенное время в форме, показанной ниже (в противном случае этот цикл занимает несколько секунд):
В этом случае я вижу, что существует всего несколько остановленных процессов, и все они находятся на одном конкретном узле (192.168.0.228):
Каждый воркер на этом узле останавливается на паре задач read_parquet:
Это было вызвано с использованием следующего кода и fastparquet:
ddf = dd.read_parquet(file_path, columns=['col1', 'col2'], index=False, gather_statistics=False)
В моем кластере работает Ubuntu 19.04 и все последние версии (по состоянию на 11/12) Dask и Distributed, а также необходимые пакеты (например, tornado, fsspec, fastparquet и т. Д.)
Данные, к которым пытается получить доступ узел .228, находятся на другом узле моего кластера. Узел .228 получает доступ к данным через общий доступ к файлам CIFS. Я запускаю планировщик Dask на том же узле, на котором я запускаю скрипт (отличается как от узла .228, так и от узла хранения данных). Скрипт подключает воркеров к планировщику через SSH с помощью Paramiko:
ssh_client = paramiko.SSHClient()
stdin, stdout, stderr = ssh_client.exec_command('sudo dask-worker ' +
' --name ' + comp_name_decode +
' --nprocs ' + str(nproc_int) +
' --nthreads 10 ' +
self.dask_scheduler_ip, get_pty=True)
Связь узла .228 с планировщиком и узлом хранения данных выглядит нормально. Возможно, у узла .228 возникла какая-то краткая проблема с подключением при попытке обработать задачу read_parquet, но если это произошло, подключение узла .228 к планировщику и общим ресурсам CIFS не пострадали после этого краткого момента. В любом случае журналы не показывают никаких проблем. Это весь журнал узла .228:
distributed.worker - INFO - Start worker at: tcp://192.168.0.228:42445
distributed.worker - INFO - Listening to: tcp://192.168.0.228:42445
distributed.worker - INFO - dashboard at: 192.168.0.228:37751
distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.167:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 2
distributed.worker - INFO - Memory: 14.53 GB
distributed.worker - INFO - Local Directory: /home/dan/worker-50_838ig
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.167:8786
distributed.worker - INFO - -------------------------------------------------
Не говоря уже о том, является ли это ошибкой в Dask или в моем коде / сети, можно ли установить общий тайм-аут для всех задач, обрабатываемых планировщиком? В качестве альтернативы можно:
- выявить зависшие задачи,
- скопировать остановившуюся задачу и переместить ее другому исполнителю, и
- отменить остановленную задачу?