Применить ufunc к переменной одиночного набора данных xarray как отложенную операцию с помощью dask

Я хотел бы применить пользовательскую функцию к переменной в xarray.Dataset, изменяя только указанную переменную. В то же время я пытаюсь сделать эту часть графика вычислений dask, чтобы ее можно было отложить до считывания на диск с помощью to_netcdf.

На данный момент я могу применить ufunc с помощью xr.apply_ufunc(), но только ко всем переменным в наборе данных.

Я понимаю, что, вероятно, мог бы получить доступ к переменной напрямую, используя ее имя, например Dataset.var, и передать это apply_ufunc(), но я не совсем понимаю, как выходные данные этой функции (отложенное будущее) будут рекомбинированы с исходным набором данных перед выходом.

В идеале я хочу сделать что-то вроде этого (где 'data.nc' имеет несколько переменных и только var1 возведен в квадрат).

import xarray as xr
from distributed import Client

dask_client = Client()

def square(x):
    return x*x

data = xr.open_dataset('data.nc', chunks={'d1':10})
fut_sq = xr.apply_ufunc(square, data.var1, dask='parallelized', output_dtypes=['float'])
data.var1 = fut_sq.var1
fut_save = data.to_netcft('new.nc', compute=False)

dask_client.compute(fut_save)

person TonyH    schedule 13.07.2019    source источник


Ответы (1)


Поэтому я немного поигрался с этим и решил, что лучший способ сделать это - извлечь данные из файла netCDF4, преобразовать их в dask.array и затем переписать новый файл на диск. Это включает в себя написание пользовательских функций с использованием функции dask.delayed. Вероятно, использование подхода ufunc было неуместным для моей проблемы.

Несколько недостатков этого:

  1. Похоже, вы не можете изменить файл на месте. Чтобы сохранить измененные переменные из исходного файла NetCDF4, вам необходимо переписать весь файл на диск.
  2. По крайней мере, для меня лучший способ распараллелить пользовательскую функцию square - это создать свои собственные блоки данных и передать их по отдельности в square. Затем восстановите их, используя dask.array.concatenate. Я знаю, что у dask есть некоторые функции упаковки, но я изо всех сил пытался заставить его работать так, как я хотел.
  3. Чтение файла происходит параллельно, но не похоже, что dask записывает в NetCDF4 параллельно.

Было бы здорово, если бы меня можно было исправить по этим пунктам.

Вот мой измененный пример

import xarray as xr
from distributed import Client
import dask
import dask.array as da

dask_client = Client()

def bag_slices(ind, n=10):
    bag = list()
    prev = 0
    for i in range(len(ind)):
        if (i+1)%n == 0:
            bag.append(slice(prev, i+1, 1))
            prev = i+1
    if prev != i+1:
        bag.append(slice(prev, i+1, 1))
    return bag

@dask.delayed
def square(x):
    return x*x

@dask.delayed
def assign(old_xr_dataset, new_data):
    old_xr_dataset['var1'].values = new_data
    return old_xr_dataset

# for me data.data.var1 is 3D and I process by splitting the data along the second dimension.
with xr.open_dataset('data.nc', chunks={'d1':10}) as data:
    # create slice bags for distributed processing along preferred axis
    bags = bag_slices(data.coords['dim2'].values, n=10)
    # convert to dask array
    data_da = da.from_array(data.var1.values)
    # create data bags
    bags = [data_da[:, slc, :] for slc in bags]

    future_squared = []
    for data_bag in bags:
        # concatenate doesn't understand delayed objects 
        # so must convert them back to delayed arrays
        future_squared.append(da.from_delayed(square(data_bag), data_bag.shape, dtype=float))
    data_new = dask.array.concatenate(future_squared, axis=1)

    fut_dataset = assign(data, data_new)
    fut_nc_save = fut_dataset.to_netcdf('data_squared.nc', compute=False)
    fut_nc_save.compute()
person TonyH    schedule 17.07.2019