Как создать пользовательский pyfunc для прогнозирования с использованием модели, для которой требуется входная форма с более чем двумя измерениями с использованием MLflow?

Я новичок в TensorFlow и MLFlow, и у меня есть проблема, аналогичная той, что задана в здесь. Я реализую модель TensorFlow для прогнозирования значений таймсерий. С этой целью я использовал mlflow.tensorflow.autolog () из MLFlow для отслеживания и обслуживания моделей в моем экземпляре. Тем не менее, поскольку моя форма ввода имеет более двух измерений, я не смог использовать этот метод.

Как было предложено ранее , я попытался кодировать / декодировать входные данные в режиме прогнозирования, используя для этого пользовательский pyfunc.

Таким образом, у меня есть функция model_test.py с методом прогнозирования, который декодирует входные данные, а именно:

import sys
import os
import json
import mlflow
import numpy as np
import pandas as pd
from mlflow.pyfunc import PythonModel
import tensorflow as tf
import base64



class ModelTest(PythonModel):

    def __init__(self, estimator=None,window_size = 64,batch_size = 256,shuffle_buffer_size = 100):
        # CODE TO CREATE THE EXPERIMENT
        self.window_size = window_size
        self.batch_size = batch_size
        self.shuffle_buffer_size = shuffle_buffer_size

    def windowed_dataset(self,series, window_size, batch_size, shuffle_buffer):
            series = tf.expand_dims(series, axis=-1)
            ds = tf.data.Dataset.from_tensor_slices(series)
            ds = ds.window(window_size + 1, shift=1, drop_remainder=True)
            ds = ds.flat_map(lambda w: w.batch(window_size + 1))
            ds = ds.shuffle(shuffle_buffer)
            ds = ds.map(lambda w: (w[:-1], w[1:]))
            self.windowed_ds = ds.batch(batch_size).prefetch(1)
            return self

    def train(self, train_set, y = None, epochs = 500):

        model = tf.keras.models.Sequential([
                  tf.keras.layers.Conv1D(filters=60, kernel_size=5,
                                      strides=1, padding="causal",
                                      activation="relu",
                                      input_shape=[None, 1]),
                  tf.keras.layers.LSTM(60, return_sequences=True),
                  tf.keras.layers.Dense(10, activation="relu"),
                  tf.keras.layers.Dense(1),
                  tf.keras.layers.Lambda(lambda x: x * 400)
                ])
        optimizer = tf.keras.optimizers.SGD(lr=1e-6, momentum=0.9)
        model.compile(loss=tf.keras.losses.Huber(),optimizer=optimizer,metrics=["mae"])
        model.fit(x=train_set, y=y,epochs=5)
        self.modelo = model

        return self

    def predict(self, series_encoded):
        # Decode the data that arrives to the method
        def decode_ts(x):
            return pd.Series(np.frombuffer(base64.b64decode(x)))
        series_decode = decode_ts(series_encoded)
        # Preprocess data
        series = np.expand_dims(series_decode, axis=1)
        ds = tf.data.Dataset.from_tensor_slices(series)
        # Replace the number by a variable window_size
        ds = ds.window(60, shift=1, drop_remainder=True)
        # Replace the number by a variable window_size
        ds = ds.flat_map(lambda w: w.batch(60))
        ds = ds.batch(32).prefetch(1)
        # Prediction
        forecast = self.modelo.predict(ds)

        return forecast

И файл run.py для обучения и сохранения модели:

import os
import mlflow.pyfunc
import ModelTest as model_test
import sys
import json
import mlflow
import numpy as np
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import Preprocessing as pre

#@click(...) # define the click options according to MLproject file
def run():
    # Code to load time series data from MongoDB and preprocess it

    window_size = 64
    batch_size = 256
    shuffle_buffer_size = 100
    split_time = 400

    series = np.array(data_df['sensor_ts'])
    time = np.array(data_df['time'])
    time_train = time[:split_time]
    x_train = series[:split_time]
    time_valid = time[split_time:]
    x_valid = series[split_time:]


    modelo = modelo_tercero.ModelTest()
    modelo.windowed_dataset(x_train, window_size, batch_size, shuffle_buffer_size)

    with mlflow.start_run() as run:
        model = modelo.train(modelo.windowed_ds)
        model_path = os.path.join('models', run.info.run_id)

        # Save model
        mlflow.pyfunc.save_model(
            path=model_path,
            python_model= modelo.train(modelo.windowed_ds),
            code_path=['Modelthird.py'],
            conda_env={
                'channels': ['defaults', 'conda-forge'],
                'dependencies': [
                    'mlflow=1.6.0',
                    'numpy=1.18.1',
                    'tensorflow=2.1.0',
                    'pandas=0.25.3',
                    'python=3.7.6',
                    'cloudpickle==0.5.8'
                ],
                'name': 'mlflow-env'
            }
        )


if __name__ == "__main__":
    run()

Когда я запускаю run.py, я получаю следующие ошибки, когда модель будет сохранена:

 Traceback (most recent call last):

File "run.py", line 116, in <module>
    run()
  File "run.py", line 110, in run
    'name': 'mlflow-env'
  File "/opt/conda/lib/python3.7/site-packages/mlflow/pyfunc/__init__.py", line 596, in save_model
    code_paths=code_path, mlflow_model=mlflow_model)
  File "/opt/conda/lib/python3.7/site-packages/mlflow/pyfunc/model.py", line 141, in _save_model_with_class_artifacts_params
    cloudpickle.dump(python_model, out)
  File "/opt/conda/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 1109, in dump
    CloudPickler(file, protocol=protocol).dump(obj)
  File "/opt/conda/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 482, in dump
    return Pickler.dump(self, obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py", line 873, in __reduce__
    return convert_to_tensor, (self._numpy(),)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py", line 910, in _numpy
    six.raise_from(core._status_to_exception(e.code, e.message), None)
  File "<string>", line 3, in raise_from

Я просмотрел различную документацию, связанную с сохранением и сериализацией моделей TensorFlow, но документации по моделям TensorFlow и пользовательским функциям pyfunc в MLFlow не так много. Может ли кто-нибудь мне помочь или намекнуть?

Заранее спасибо!! : D


person Luis Gasco Sanchez    schedule 26.02.2020    source источник
comment
где ты сможешь это решить?   -  person rv123    schedule 15.02.2021