Как правильно установить рабочие порты в SSHCluster, распределенном по Dask?

Я пытаюсь использовать Dask для распределения работы с машины (назовите ее A) на 4 серверах в центре обработки данных (назовите их B, C, D и E). A должен настроить SSHCluster, назначив планировщику жить на B, который затем должен порождать рабочих на B, C, D и E. Хитрость в том, что открыты только некоторые порты и, следовательно, должны быть указаны. Это легко сделать для планировщика, но я не могу заставить его работать для рабочих.

Если они не указаны, A успешно запускает планировщик на B. Планировщик считает, что успешно запускает всех рабочих на случайных портах, но при сборе результатов обнаруживает, что он может связываться только с рабочими на B. Пока это имеет смысл. Код для этого:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                 })

Как только я пытаюсь установить порты для рабочих, он не запускает рабочие. Кажется, это происходит независимо от того, какой вклад я даю. Я попытался запустить по одному рабочему на каждом сервере, указав порт для использования как int:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': 60000,
                                 })

Я попытался запустить несколько воркеров на каждом сервере, указав диапазон используемых портов:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': '{}:{}'.format(
                                         60000, 60000 + procs_per_node - 1),
                                 })

Я попытался запустить несколько воркеров на каждом сервере, предоставив полный диапазон доступных портов:

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'port': '60000:61000'
                                 })

Каждый раз он возвращает четыре ошибки (из B, C, D и E) с сообщением «Исключение: не удалось запустить рабочий процесс».

В общем, это мои вопросы:

  • Как я могу назначить порты рабочим в Dask SSHCluster?
  • Как только это будет сделано, нужно ли мне делать то же самое с процессами няни? Если да, то как?

Для справки, вот версии, которые я использую (возможно, не все они актуальны): python 3.8.3, dask 2.18.1, dask-core 2.18.1, распространенный 2.18.0, tornado 6.0.4, bokeh 2.01.


person Luca    schedule 17.06.2020    source источник


Ответы (1)


Кажется, это нормально работает при использовании worker_port вместо port

cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
                                 worker_options={
                                     'nprocs': procs_per_node, 
                                     'nthreads': 1,
                                     'worker_port': '60000:61000'
                                 })

https://github.com/dask/distributed/blob/93701f82c2cef46d4e68696bf48af0fc65ea9159/distributed/cli/dask_worker.py#L54

person abduh    schedule 27.06.2020