Я пытаюсь использовать Dask для распределения работы с машины (назовите ее A) на 4 серверах в центре обработки данных (назовите их B, C, D и E). A должен настроить SSHCluster, назначив планировщику жить на B, который затем должен порождать рабочих на B, C, D и E. Хитрость в том, что открыты только некоторые порты и, следовательно, должны быть указаны. Это легко сделать для планировщика, но я не могу заставить его работать для рабочих.
Если они не указаны, A успешно запускает планировщик на B. Планировщик считает, что успешно запускает всех рабочих на случайных портах, но при сборе результатов обнаруживает, что он может связываться только с рабочими на B. Пока это имеет смысл. Код для этого:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
})
Как только я пытаюсь установить порты для рабочих, он не запускает рабочие. Кажется, это происходит независимо от того, какой вклад я даю. Я попытался запустить по одному рабочему на каждом сервере, указав порт для использования как int:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
'port': 60000,
})
Я попытался запустить несколько воркеров на каждом сервере, указав диапазон используемых портов:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
'port': '{}:{}'.format(
60000, 60000 + procs_per_node - 1),
})
Я попытался запустить несколько воркеров на каждом сервере, предоставив полный диапазон доступных портов:
cluster = distributed.SSHCluster([scheduler_location] + list(worker_locations),
worker_options={
'nprocs': procs_per_node,
'nthreads': 1,
'port': '60000:61000'
})
Каждый раз он возвращает четыре ошибки (из B, C, D и E) с сообщением «Исключение: не удалось запустить рабочий процесс».
В общем, это мои вопросы:
- Как я могу назначить порты рабочим в Dask SSHCluster?
- Как только это будет сделано, нужно ли мне делать то же самое с процессами няни? Если да, то как?
Для справки, вот версии, которые я использую (возможно, не все они актуальны): python 3.8.3, dask 2.18.1, dask-core 2.18.1, распространенный 2.18.0, tornado 6.0.4, bokeh 2.01.