Я новичок в параллельной обработке и прошу приложение. Итак, у меня есть тысячи файлов, которые я хочу запускать параллельно, поэтому я использую для этого вычисления Dask. Мои рабочие и ядра распределены правильно. И я запускаю все в JuputerLab Environment.
Итак, вот моя попытка;
Подключение к шлюзу:
from dask_gateway import Gateway
gateway = Gateway(
address=address,
public_address=public_address,
auth="jupyterhub",
)
options = gateway.cluster_options()
options
Выбор рабочих не требуется:
cluster = gateway.new_cluster(
cluster_options=options,
)
cluster.adapt(minimum=10, maximum=50)
client = cluster.get_client()
cluster
client
В этом списке у меня есть куча моих gzip-файлов, внутри которых есть nc-файлы:
turb=glob.glob('Data/*')
И вот моя функция, которую я должен выполнять с этими файлами:
def get_turb(file):
name = str(file[5:18])
d=[file[5:9],file[9:11],file[11:13],file[14:16],file[16:18]]
f_zip = gzip.open(file, 'rb')
yr=d[0]
mo=d[1]
da=d[2]
hr=d[3]
mn=d[4]
fs = s3fs.S3FileSystem(anon=True)
period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da), freq='D')
# period.dayofyear
dy=period.dayofyear
cc=[7,8,9,10,11,12,13,14,15,16] #look at the IR channels only for now
dat = xr.open_dataset(f_zip)
А вот делаю отложенную задачу и вычисляю:
files = []
for grb_file in turb[:20]:
s3_ds = dask.delayed(get_turb)(grb_file)
files.append(s3_ds)
s3_ds.visualize()
files = dask.compute(*edr_files)
Dask delayed правильно генерирует объекты и добавляет их в файл, но при выполнении вычислений выдает ошибку:
FileNotFoundError: [Errno 2] No such file or directory: 'Data/20190107_0300.gz'
Там, где действительно есть файлы с тем же именем, я проверил их должным образом, и он работает отлично, без должной задержки функции dask. Может ли кто-нибудь посоветовать мне, что мне не хватает или что я делаю неправильно. Очень признателен за вашу помощь!