Создание фрейма данных dask из отложенных массивов dask

У меня есть список отложенных массивов dask, хранящихся в dask_arr_ls, которые я хочу превратить в фрейм данных dask. Вот скелет моего конвейера:

def simulate_device_data(num_id):
    # create data for unknown number of timestamps
    data_ls = [unknown_qty*[num_id, time, lon, lat]]
    device_arr = np.stack(data_ls)
    device_dask_arr = da.from_array(device_arr, chunks=device_arr.size)
    return device_dask_arr

    
dask_arr_ls = []
for i_device in range(n_devices):
    i_dask_arr = delayed(simulate_device_data)(i_device)
    dask_arr_ls.append(i_darr)
    
dask_arr_ls = [da.from_delayed(i_dask_arr, shape=(np.nan, 4), dtype=float) 
               for i_dask_arr in dask_arr_ls]
ddf = dd.concat([dd.from_dask_array(i_darr) for i_darr in darr_ls])
ddf.columns = ["num_id", "t", "lon", "lat"]
ddf.compute()

compute() выдает следующее сообщение об ошибке:

ValueError: DataFrame constructor not properly called!

Что я делаю неправильно?


person Chris Raper    schedule 21.01.2021    source источник
comment
Вы видели docs.dask.org/en / latest /?   -  person mdurant    schedule 21.01.2021


Ответы (1)


Я так и не понял, в чем была моя ошибка с кодом выше. Я подозреваю, что как-то неправильно использовал delayed. Я изменил свой конвейер следующим образом, чтобы заставить его работать.

def simulate_device_data(num_id):
    # create data for unknown number of timestamps
    data_ls = [unknown_qty*[num_id, time, lon, lat]]
    device_arr = np.stack(data_ls)
    device_df = pd.DataFrame(device_arr)
    return device_df

    
df_ls = []
for i_device in range(n_devices):
    i_df = delayed(simulate_device_data)(i_device)
    df_ls.append(i_df)
    
archetype_df = pd.DataFrame(None, columns=["name", "num_id", "t", "lon", "lat"])
archetype_df = archetype_df.astype({"name": "object", "num_id": "int64", "t": "datetime64[ns]", 
                                    "lon": "float64", "lat": "float64"},
                                   copy=False)
ddf = dd.from_delayed(df_ls, meta=archetype_df)
ddf.compute()
person Chris Raper    schedule 25.01.2021