Как перейти от модели обработки данных в записной книжке к базе кода для моделей обучения с помощью служб машинного обучения Azure и MLflow.

Краткое содержание

В этом четырехэтапном руководстве я возьму на себя роль инженера по машинному обучению и проведу вас через процесс преобразования блокнота по обработке данных в код многократного использования. В конце этого руководства мы вызовем конечную точку для обученной модели машинного обучения из информационной панели на основе Python. Весь конвейер будет запущен с использованием az cli и VSCode.

Конвейер MLOPS

Теперь, когда модель выбрана и протестирована в среде ноутбука, пришло время очистить базу кода. На втором этапе жизненного цикла модели мы упаковываем модель. Нам нужно навести порядок в блокноте, создать методы, применить линтинг и написать тесты для создаваемых нами пользовательских функций.

Режим ноутбука:

import os
import argparse
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    args = parser.parse_args()
   
    # start Logging
    mlflow.start_run()

    # enable autologging
    mlflow.sklearn.autolog()

    ###################
    #<prepare the data>
    ###################
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)
    
    credit_df = pd.read_csv(args.data, header=1, index_col=0)

    mlflow.log_metric("num_samples", credit_df.shape[0])
    mlflow.log_metric("num_features", credit_df.shape[1] - 1)

    train_df, test_df = train_test_split(
        credit_df,
        test_size=args.test_train_ratio,
    )
    ###################
    #</prepare the data>
    ###################

    ##################
    #<train the model>
    ##################
    # extracting the label column
    y_train = train_df.pop("default payment next month")

    # convert the dataframe values to array
    X_train = train_df.values

    # extracting the label column
    y_test = test_df.pop("default payment next month")

    # convert the dataframe values to array
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    clf = GradientBoostingClassifier(
        n_estimators=args.n_estimators, learning_rate=args.learning_rate
    )
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)

    print(classification_report(y_test, y_pred))

    ##################
    #</train the model>
    ##################

    ##########################
    #<save and register model>
    ##########################
    # registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.registered_model_name, "trained_model"),
    )
    
    # stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

В приведенном выше примере мы сначала подготавливаем данные, а затем обучаем модель. Для этих шагов мы можем создать два независимых класса Python и разделить их на два файла: prep.py и train.py. Все общие функции, используемые в обоих классах, будут помещены в utils.py.

src/
  mlops/
    prep.py
    train.py
    utils.py

Подготовьте данные

Чтобы запускать файлы Python как задания в Azure ML, нам сначала нужно создать класс main() и сделать его гибким, добавив возможность предоставления аргументов.

# This method lets us add arguments when running the Python script 
def parse_args():
    """Parse input arguments"""
    parser = argparse.ArgumentParser()
    parser.add_argument("--raw_data", type=str, help="Path to raw data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.75)
    parser.add_argument("--cfg", type=str, help="Path to config file")
    parser.add_argument("--base_path", type=str, help="Path to all files")

    return parser.parse_args()

Чтобы задание выполнялось в Azure ML, имя функции должно быть main().

if __name__ == "__main__":
    mlflow.start_run()
    mlflow.sklearn.autolog()
    # Getting the command line argumetns 
    args = parse_args()
    # Returning some information
    lines = [
        f"Raw data path: {args.raw_data}"
    ]

    for line in lines:
        print(line)
    # Calling the main function with the arguments
    main(args)

    mlflow.end_run()

Итак, теперь у нас есть наша структура:

  • Мы будем читать аргументы из командной строки для большей гибкости.
  • Мы вызовем нашу функцию с этими аргументами, и эта функция называется main().
  • Нам нужно загрузить данные из файла (в нашем случае файлов .csv), который в идеале будет работать и с файлами разных форматов.
def load_data(path: str) -> pd.DataFrame:
    """Function to load the data from the source.

    Args:
        path: File path.

    Returns:
        Dataframe.
    """
    return pd.read_csv(path, header=1, index_col=0)
  • После загрузки данных мы хотим разделить их на обучающие и тестовые (проверка пока будет пропущена), используя параметр test_train_ratio.
def prepare_data(data: pd.DataFrame, test_train_ratio: float) -> pd.DataFrame:
    """Function to read and split the data in train and test sets.

    Args:
        test_train_ratio: Test-train ratio.
        data: Dataframe.

    Returns:
        Train and test split sets.
    """
    mlflow.log_metric("num_samples", data.shape[0])
    mlflow.log_metric("num_features", data.shape[1] - 1)

    msk = np.random.rand(len(data)) < test_train_ratio

    train_df = data[msk]
    test_df = data[~msk]

    return train_df, test_df

Мы загружаем информацию из файла конфигурации. Этот код используется несколько раз и добавляется в файл utils.py.

import yaml


def load_config(config_path: str) -> dict:
    """Function to load config file in yml format."""
    cfg = []
    with open(config_path, "r") as ymlfile:
        cfg = yaml.safe_load(ymlfile)
    return cfg

При работе с Azure ML будет много параметров, которые мы не хотим постоянно предоставлять или запоминать в качестве входных данных командной строки. Вот файл config.yml, который я использовал для этого примера.

connections:
  tenant_id: ""
  subscription_id: "xxxxx"
  resource_group: "xxxxx"
  workspace: "xxxxx"
deployments:
  job_name: ""
  endpoint_name: ""
  endpoint_url: ""
data:
  sample_request: ""
  file_name: ""
  datastore: "xxxxx"
  train_data: "train.parquet"
  test_data: "test.parquet"
model:
  registered_name: "credit_defaults_model"
  version: 1
  workspace_id: ""
  folder_path: "model"
training:
  job_name: ""
  environment: ""
  experiment: ""
  test_train_ratio: 0.0
  learning_rate: 0.0
compute:
  cluster_name: ""
  image: ""
  environment_name: ""
  conda_file: ""

Если собрать все вместе, то получим следующий скрипт prep.py:

"""Main file for training and regitering the model."""
import argparse
from pathlib import Path

import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np

import os, sys
# For now we don't create a package and work without Docker,
# therefore we need to add the file path to the system path to be able
# import code from the utils file 
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
from utils import load_config

def parse_args():
    """Parse input arguments"""
    parser = argparse.ArgumentParser()
    parser.add_argument("--raw_data", type=str, help="Path to raw data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.75)
    parser.add_argument("--cfg", type=str, help="Path to config file")
    parser.add_argument("--base_path", type=str, help="Path to all files")

    return parser.parse_args()

def main(args) -> None:
    """Main function to train and register the model."""

    # Getting the configuration file
    cfg = load_config(args.cfg)
    # Loading the data 
    df = load_data(args.raw_data)
    # Preparing the data
    train_df, test_df = prepare_data(data=df, test_train_ratio=args.test_train_ratio)
    # MLflow logs
    mlflow.log_metric("train size", train_df.shape[0])
    mlflow.log_metric("test size", test_df.shape[0])
    # Writting the data back to parquet 
    train_df.to_parquet(args.base_path / (Path(cfg["data"]["train_data"])))
    test_df.to_parquet(args.base_path / (Path(cfg["data"]["test_data"])))
    
    print(f"Train dataset output path: {cfg['data']['train_data']}")
    print(f"Test dataset path: {cfg['data']['test_data']}")

def load_data(path: str) -> pd.DataFrame:
    """Function to load the data from the source.

    Args:
        path: File path.

    Returns:
        Dataframe.
    """
    return pd.read_csv(path, header=1, index_col=0)

def prepare_data(data: pd.DataFrame, test_train_ratio: float) -> pd.DataFrame:
    """Function to read and split the data in train and test sets.

    Args:
        test_train_ratio: Test-train ratio.
        data: Dataframe.

    Returns:
        Train and test split sets.
    """
    mlflow.log_metric("num_samples", data.shape[0])
    mlflow.log_metric("num_features", data.shape[1] - 1)

    msk = np.random.rand(len(data)) < test_train_ratio

    train_df = data[msk]
    test_df = data[~msk]

    return train_df, test_df

if __name__ == "__main__":
    mlflow.start_run()
    mlflow.sklearn.autolog()

    args = parse_args()

    lines = [
        f"Raw data path: {args.raw_data}"
    ]

    for line in lines:
        print(line)

    main(args, cfg)

    mlflow.end_run()

Обучите модель:

Мы будем использовать ту же структуру для обучения нашей модели:

  • Мы будем читать аргументы из командной строки для большей гибкости.
  • Мы вызовем нашу функцию с этими аргументами, и эта функция называется main().
  • Мы будем обучать нашу модель с помощью функции GradientBoostClassifier из научного набора.
def train_model(
    learning_rate: float,
    n_estimators: int,
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
) -> GradientBoostingClassifier:
    """Function to read and split the data in train and test sets.

    Args:
        learning_rate: Learning rate.
        n_estimators: Number estimators.
        X_train: Train data set dataframe.
        y_train: Test label set dataframe.

    Returns:
        Gradient boosting classifier.
    """
    clf = GradientBoostingClassifier(
        n_estimators=n_estimators, learning_rate=learning_rate
    )
    clf.fit(X_train, y_train)

    return clf
  • После обучения модели она будет сохранена в заранее определенном месте.
def save_model_to_file(
    model_output: str, 
    clf: GradientBoostingClassifier
) -> None:
    """Function to save the model to file.

    Args:
        model_output: Path for saving the model.
        clf: Gradient boosting classifier.
    """
    mlflow.sklearn.save_model(sk_model=clf, path=model_output)

Весь скрипт train.py:

"""Main file for training and regitering the model."""
import argparse
from pathlib import Path

import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report


def parse_args():
    """Parse input arguments"""
    parser = argparse.ArgumentParser()
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--model_output", type=str, help="Path of output model")
    parser.add_argument("--train_data", type=str, help="Path to train dataset")
    parser.add_argument("--test_data", type=str, help="Path to test dataset")

    return parser.parse_args()


def main(args) -> None:
    """Main function to train and register the model."""
    # Load the previously prepared data
    train_df = pd.read_parquet(Path(args.train_data) / "train.parquet")
    test_df = pd.read_parquet(Path(args.test_data) / "test.parquet")
    
    # Split into input and output
    y_train = train_df.pop("default payment next month")
    X_train = train_df.values
    y_test = test_df.pop("default payment next month")
    X_test = test_df.values
    # Train the model
    clf = train_model(args.learning_rate, args.n_estimators, X_train, y_train)
    # Call predict on the model 
    y_pred = clf.predict(X_test)
    # Get the model score 
    print(classification_report(y_test, y_pred))
    # Store the model
    save_model_to_file(args.model_output, clf)


def train_model(
    learning_rate: float,
    n_estimators: int,
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
) -> GradientBoostingClassifier:
    """Function to read and split the data in train and test sets.

    Args:
        learning_rate: Learning rate.
        n_estimators: Number estimators.
        X_train: Train data set dataframe.
        y_train: Test label set dataframe.

    Returns:
        Gradient boosting classifier.
    """
    clf = GradientBoostingClassifier(
        n_estimators=n_estimators, learning_rate=learning_rate
    )
    clf.fit(X_train, y_train)

    return clf


def save_model_to_file(
    model_output: str, 
    clf: GradientBoostingClassifier
) -> None:
    """Function to save the model to file.

    Args:
        model_output: Path for saving the model.
        clf: Gradient boosting classifier.
    """
    mlflow.sklearn.save_model(sk_model=clf, path=model_output)


if __name__ == "__main__":
    mlflow.start_run()
    mlflow.sklearn.autolog()

    args = parse_args()

    main(args)

    mlflow.end_run()

Пока я пропущу юнит-тесты. Созданные нами пользовательские методы основаны на других библиотеках и практически не содержат нового кода. В производственной среде необходимо отслеживать покрытие тестированием и поддерживать его на высоком уровне.

Для линтинга я использовал Flake8. Чтобы использовать flake8, вам необходимо создать файл .flake8, содержащий ваши конфигурации проверки. Это мое:

[flake8]
select = DAR,BLK,C,D,E,I,F,W,ANN
ignore = E501,I001,I005,DAR401,D401,W503,E401,E402
max-complexity = 10
max-line-length = 95
docstring-convention = google
strictness = short

Я использую предварительную фиксацию, которая гарантирует запуск flake8 всякий раз, когда я хочу вернуть код.

Чтобы вызвать эти две функции, нам нужно будет создать задания в Azure ML и определить зависимости между ними.

В следующей статье я объясню, как расширить приведенную выше базу кода для оценки и регистрации модели в Azure ML. Мы будем писать код третьего этапа жизненного цикла модели.