Не могу обучить модель Keras с помощью Dask?

Я ожидал от простых примеров с использованием задержки Dask. Я читал, что могу по существу воспроизвести gridsearchcv из scikit-learn с помощью нескольких вызовов функций следующим образом. Похоже, что модель никогда не подходит (model.fit(...)) потому что остальная часть цикла продолжается (pred(...))?

Есть ли проблема с тем, как я вставляю функции? Я знаю, что для dask существует gridsearchcv, но проблема в том, что моя настоящая модель представляет собой Keras LSTM с несколькими входами, и вы не можете передать трехмерный массив как «X». Код отлично работает в сериале без Dask.

Вот небольшой воспроизводимый пример:

import dask
import pandas as pd
import numpy as np
from sklearn.datasets import load_boston
from sklearn.model_selection import KFold,ParameterGrid
from sklearn.metrics import mean_squared_error 
from keras import Sequential
from keras.layers import Dense

boston = load_boston()
y=boston.target
X=boston.data


@dask.delayed
def create_model(dense_nodes):
    model = Sequential()
    model.add(Dense(dense_nodes, input_dim=13, kernel_initializer='normal', activation='relu'))
    model.add(Dense(1, kernel_initializer='normal'))
    # Compile model
    model.compile(loss='mean_squared_error', optimizer='adam')
    return model

@dask.delayed
def cv_model(X,y,kf,params_dct):

    dense_nodes = params_dct['dense']

    hold_actual=np.zeros((X.shape[0],1))
    hold_preds=np.zeros((X.shape[0],1))

    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        model=create_model(dense_nodes)
        model.fit(X_train,y_train,batch_size=64, epochs=5)
        pred=model.predict(X_test)
        hold_actual[test_index,0]=y_test.ravel()
        hold_preds[test_index,0]=pred.ravel()

    return(mean_squared_error(hold_actual,hold_preds))



kfold=KFold(n_splits=3,random_state=4521)
grid=ParameterGrid({'dense':[2,3,4,5,6,7,8,9,10]})

output=[]
for i in grid:
    output.append(cv_model(X,y,kfold,grid[0]))

total=dask.delayed(output)
total.compute()




---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-53-2116b76de18c> in <module>()
     52 
     53 total=dask.delayed(output)
---> 54 total.compute()

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    153         dask.base.compute
    154         """
--> 155         (result,) = compute(self, traverse=False, **kwargs)
    156         return result
    157 

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    402     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    403                     else (None, a) for a in args]
--> 404     results = get(dsk, keys, **kwargs)
    405     results_iter = iter(results)
    406     return tuple(a if f is None else f(next(results_iter), *a)

~/anaconda3/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

~/anaconda3/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~/anaconda3/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     65         if exc.__traceback__ is not tb:
     66             raise exc.with_traceback(tb)
---> 67         raise exc
     68 
     69 else:

~/anaconda3/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~/anaconda3/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

<ipython-input-53-2116b76de18c> in cv_model(X, y, kf, params_dct)
     38         pred=model.predict(X_test)
     39         hold_actual[test_index,0]=y_test.ravel()
---> 40         hold_preds[test_index,0]=pred.ravel()
     41 
     42     return(mean_squared_error(hold_actual,hold_preds))

ValueError: setting an array element with a sequence.

ДОБАВИТЬ №1

Вот вторая попытка, ошибка осталась.

import dask
import pandas as pd
import numpy as np
from sklearn.datasets import load_boston
from sklearn.model_selection import KFold,ParameterGrid
from sklearn.metrics import mean_squared_error 
from keras import Sequential
from keras.layers import Dense
import tensorflow as tf
boston = load_boston()
y=boston.target
X=boston.data

import tensorflow as tf


#You never want to call delayed functions from within other delayed functions
 #https://stackoverflow.com/questions/51219354/cant-train-keras-model-with-dask

@dask.delayed
def create_model(dense_nodes):
    model = Sequential()
    model.add(Dense(dense_nodes, input_dim=13, kernel_initializer='normal', activation='relu'))
    model.add(Dense(1, kernel_initializer='normal'))
    # Compile model
    model.compile(loss='mean_squared_error', optimizer='adam')
    return model


def cv_model(X,y,kf,params_dct):

    dense_nodes = params_dct['dense']

    hold_actual=np.zeros((X.shape[0],1))
    hold_preds=np.zeros((X.shape[0],1))


    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]


        model=create_model(dense_nodes)
        model.fit(X_train,y_train,batch_size=64, epochs=5)

        pred=model.predict(X_test)

        hold_actual[test_index,0]=y_test.ravel()
        hold_preds[test_index,0]=pred.ravel()
    return(dask.delayed(mean_squared_error(hold_actual,hold_preds)))



kfold=KFold(n_splits=3,random_state=4521)
grid=ParameterGrid({'dense':[2,3,4,5,6,7,8,9,10]})

output=[]
for i in grid:
    delayed_value=cv_model(X,y,kfold,grid[0])

result=delayed_value.compute()

ДОБАВИТЬ №2

Оказывается, у Keras/TF есть проблема, которая вызывает ошибку вне Dask. Об этом я расскажу в отдельном вопросе. Итак, я заменил модель Keras на модель Xgboost, чтобы обеспечить правильную настройку Dask для этой цели.

Вот этот код. Я обнаружил, что мне нужно закомментировать вызов Dask, задержанный в бите mean_squared_error.

import dask
import pandas as pd
import numpy as np
from sklearn.datasets import load_boston
from sklearn.model_selection import KFold,ParameterGrid
from sklearn.metrics import mean_squared_error 

import xgboost as xgb





boston = load_boston()
y=boston.target
X=boston.data


@dask.delayed
def cv_model(X,y,kf,params_dct):


    hold_actual=np.zeros((X.shape[0],1))
    hold_preds=np.zeros((X.shape[0],1))

    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]


        dtrain=xgb.DMatrix(data=X_train, label=y_train)
        dtest=xgb.DMatrix(data=X_test, label=y_test)

        regmod = xgb.train(params_dct, dtrain, 10)
        pred=regmod.predict(dtest)
        hold_actual[test_index,0]=y_test.ravel()
        hold_preds[test_index,0]=pred.ravel()

    #return(dask.delayed(mean_squared_error)(np.array(hold_actual),np.array(hold_preds)))
    return({'result':mean_squared_error(np.array(hold_actual),np.array(hold_preds)),'param':params_dct})


kfold=KFold(n_splits=3,random_state=4521)
grid=ParameterGrid({'max_depth':[2,3,4,5,6,7,8,9,10], 'eta':[0.01,0.05], 'min_child_weight': [1,2,3,4,5]})

output=[]
for i in grid:
    output.append(cv_model(X,y,kfold,i))

total=dask.delayed(output)
result=total.compute()

person B_Miner    schedule 07.07.2018    source источник


Ответы (1)


Вы не хотите вызывать dask.delayed для функции cv_model. Вы никогда не захотите вызывать отложенные функции из других отложенных функций. Вместо этого функции, которые вызывают отложенные функции, часто очень быстры (они не выполняют никакой работы), и поэтому вы хотите вызывать их немедленно, а не лениво.

Похоже, что ваш цикл for лениво создает множество моделей, вызывает методы этих моделей (которые также будут ленивыми), а затем вызывает mean_squared_error для результатов. Эта функция, вероятно, также должна быть отмечена как отложенная, например

return dask.delayed(mean_squared_error)(hold_actual, hold_preds))

Затем, если вы удалите отложенный декоратор из cv_model, вы сможете сделать что-то вроде:

delayed_value = cv_model(...)
result = delayed_value.compute()

Во втором примере вы вызываете model.fit без использования возвращаемого значения:

    model=create_model(dense_nodes)
    model.fit(X_train,y_train,batch_size=64, epochs=5)
    pred=model.predict(X_test)

Delayed не работает на месте, поэтому вызов model.fit сам по себе ничего не даст. Вы, вероятно, хотите

model = model.fit(...)

Здесь вы вызываете dask.delayed для результата, а не для функции mean_squared_error

return(dask.delayed(mean_squared_error(hold_actual,hold_preds)))

См. https://github.com/dask/dask/pull/3737 для новых документы

person MRocklin    schedule 07.07.2018
comment
Если я правильно понял, то ошибка осталась. Я написал вторую попытку выше. - person B_Miner; 07.07.2018
comment
Смотрите обновление. Также см. новые документы, на которые это вдохновило: github.com/dask/dask/pull/3737 - person MRocklin; 08.07.2018
comment
Можешь глянуть выше? Я обнаружил, что с Keras есть проблема, поэтому я использовал ее для простой модели xgboost, просто чтобы убедиться, что у меня правильный процесс. Что вы думаете? Я обнаружил, что мне нужно закомментировать отложенный вызов mean_squared_error или были возвращены только задержанные объекты? - person B_Miner; 13.07.2018
comment
Боюсь, что это неправильно, так как с Dask он работает медленнее, чем без него. - person B_Miner; 14.07.2018