Dask - Как отменить и повторно отправить зависшие задачи?

Часто я сталкиваюсь с проблемой, когда Dask случайным образом останавливается на нескольких задачах, обычно связанных с чтением данных с другого узла в моей сети (подробнее об этом ниже). Это может произойти после нескольких часов работы скрипта без проблем. Он будет зависать на неопределенное время в форме, показанной ниже (в противном случае этот цикл занимает несколько секунд):

введите здесь описание изображения

В этом случае я вижу, что существует всего несколько остановленных процессов, и все они находятся на одном конкретном узле (192.168.0.228): введите описание изображения здесь

Каждый воркер на этом узле останавливается на паре задач read_parquet:

введите здесь описание изображения

Это было вызвано с использованием следующего кода и fastparquet:

ddf = dd.read_parquet(file_path, columns=['col1', 'col2'], index=False, gather_statistics=False)

В моем кластере работает Ubuntu 19.04 и все последние версии (по состоянию на 11/12) Dask и Distributed, а также необходимые пакеты (например, tornado, fsspec, fastparquet и т. Д.)

Данные, к которым пытается получить доступ узел .228, находятся на другом узле моего кластера. Узел .228 получает доступ к данным через общий доступ к файлам CIFS. Я запускаю планировщик Dask на том же узле, на котором я запускаю скрипт (отличается как от узла .228, так и от узла хранения данных). Скрипт подключает воркеров к планировщику через SSH с помощью Paramiko:

ssh_client = paramiko.SSHClient()
stdin, stdout, stderr = ssh_client.exec_command('sudo dask-worker ' +
                                                            ' --name ' + comp_name_decode +
                                                            ' --nprocs ' + str(nproc_int) +
                                                            ' --nthreads 10 ' +
                                                            self.dask_scheduler_ip, get_pty=True)  

Связь узла .228 с планировщиком и узлом хранения данных выглядит нормально. Возможно, у узла .228 возникла какая-то краткая проблема с подключением при попытке обработать задачу read_parquet, но если это произошло, подключение узла .228 к планировщику и общим ресурсам CIFS не пострадали после этого краткого момента. В любом случае журналы не показывают никаких проблем. Это весь журнал узла .228:

distributed.worker - INFO - Start worker at: tcp://192.168.0.228:42445

distributed.worker - INFO - Listening to: tcp://192.168.0.228:42445

distributed.worker - INFO - dashboard at: 192.168.0.228:37751

distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 2

distributed.worker - INFO - Memory: 14.53 GB

distributed.worker - INFO - Local Directory: /home/dan/worker-50_838ig

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Registered to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

Не говоря уже о том, является ли это ошибкой в ​​Dask или в моем коде / сети, можно ли установить общий тайм-аут для всех задач, обрабатываемых планировщиком? В качестве альтернативы можно:

  1. выявить зависшие задачи,
  2. скопировать остановившуюся задачу и переместить ее другому исполнителю, и
  3. отменить остановленную задачу?

person dan    schedule 13.11.2019    source источник


Ответы (1)


Можно ли установить общий тайм-аут для всех задач, выполняемых планировщиком?

По состоянию на 13.11.2019, к сожалению, нет.

Если задача завершилась ошибкой, вы можете повторить ее с помощью client.retry(...), но нет автоматического способа, чтобы задача завершилась сбоем через определенное время. Это то, что вам придется записать в свои функции Python самостоятельно. К сожалению, трудно прервать выполнение функции Python в другом потоке, поэтому отчасти это не реализовано.

Если рабочий выйдет из строя, тогда что-нибудь будет предпринято в другом месте. Однако из того, что вы говорите, похоже, что все в порядке, просто сами задачи, вероятно, займут вечность. К сожалению, это сложно идентифицировать как случай отказа.

person MRocklin    schedule 13.11.2019
comment
Было бы разумно попросить работника выключить или перезапустить, чтобы задачи были перенесены? - person mdurant; 13.11.2019
comment
да. Если все ошибочные задачи выполняются одним рабочим, то перезапуск этого рабочего должен решить проблему. - person MRocklin; 13.11.2019
comment
Спасибо вам обоим. client.retry не решает проблему - он не пытается повторно использовать проблемные ключи. Я также попытался закрыть и повторно подключить застрявшие рабочие процессы, но client.retire_workers () работает некорректно, поэтому кластер теряет сохраненные данные и не может восстановиться. Лучшее решение, которое я нашел, - это использование флагов --lifetime в команде dask-worker, когда планировщик корректно перезапускает этого рабочего и перемещает ключи / данные другому рабочему. Но, очевидно, это не нацелено на проблемного работника. Можно ли перезапустить воркера с использованием распределенного класса аналогично --lifetime? - person dan; 22.11.2019
comment
Я не могу быть уверен, но полагаю, что сталкиваюсь с проблемой, аналогичной dan (задокументировано здесь: github.com/dask/dask/issues/7543). Описание здесь наиболее похоже на другие случаи зависания кластеров или зависших задач, которые я обнаружил на GH или SO. Я понимаю логику того, что это не случай отказа. способ, которым в настоящее время определяется здоровая связь рабочего / планировщика. Если это так, @MRocklin, можете ли вы порекомендовать параметры конфигурации или аргументы для клиента, кластера, адаптивного или чего-либо еще, что сделало бы кластер более устойчивым к остановкам такого рода? - person NLi10Me; 13.04.2021