CI/CD для обучения модели машинного обучения с помощью многопоточного и пакетного логического вывода
Предпосылка
- Учетная запись Azure
- Учетная запись хранения Azure
- Azure DevOps
- Azure Databricks
- Azure DevOps — установка Databricks Market Place
- Github для хранения кода
Данные
- Перейдите к https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/ и загрузите файлы winequality-red.csv и winequality-white.csv на свой локальный компьютер.
- Скачивание на локальные вычисления и отправка в Azure databricks в папку
Шаги
- Сначала создайте блокнот с помощью python
- Создайте кластер с 8,2ML в качестве среды выполнения.
- Вот обучение
- Регистрация модели
- Используйте модель для пакетной оценки
- Сохраните его обратно в хранилище как дельту
import pandas as pd # In the following lines, replace <username> with your username. white_wine = pd.read_csv("/dbfs/FileStore/shared_uploads/username/winequality_white.csv", sep=';') red_wine = pd.read_csv("/dbfs/FileStore/shared_uploads/username/winequality_red.csv", sep=';')
red_wine['is_red'] = 1 white_wine['is_red'] = 0 data = pd.concat([red_wine, white_wine], axis=0) # Remove spaces from column names data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)
data.head()
- Показать диаграмму
import seaborn as sns sns.distplot(data.quality, kde=False)
high_quality = (data.quality >= 7).astype(int) data.quality = high_quality
import matplotlib.pyplot as plt dims = (3, 4) f, axes = plt.subplots(dims[0], dims[1], figsize=(25, 15)) axis_i, axis_j = 0, 0 for col in data.columns: if col == 'is_red' or col == 'quality': continue # Box plots cannot be used on indicator variables sns.boxplot(x=high_quality, y=data[col], ax=axes[axis_i, axis_j]) axis_j += 1 if axis_j == dims[1]: axis_i += 1 axis_j = 0
data.isna().any()
from sklearn.model_selection import train_test_split train, test = train_test_split(data, random_state=123) X_train = train.drop(["quality"], axis=1) X_test = test.drop(["quality"], axis=1) y_train = train.quality y_test = test.quality
import mlflow import mlflow.pyfunc import mlflow.sklearn import numpy as np import sklearn from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import roc_auc_score from mlflow.models.signature import infer_signature from mlflow.utils.environment import _mlflow_conda_env # The predict method of sklearn's RandomForestClassifier returns a binary classification (0 or 1). # The following code creates a wrapper function, SklearnModelWrapper, that uses # the predict_proba method to return the probability that the observation belongs to each class. class SklearnModelWrapper(mlflow.pyfunc.PythonModel): def __init__(self, model): self.model = model def predict(self, context, model_input): return self.model.predict_proba(model_input)[:,1] experiment_name = "/Shared/wine_experiment/" mlflow.set_experiment(experiment_name) # mlflow.start_run creates a new MLflow run to track the performance of this model. # Within the context, you call mlflow.log_param to keep track of the parameters used, and # mlflow.log_metric to record metrics like accuracy. with mlflow.start_run(run_name='untuned_random_forest'): n_estimators = 10 model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(123)) model.fit(X_train, y_train) # predict_proba returns [prob_negative, prob_positive], so slice the output with [:, 1] predictions_test = model.predict_proba(X_test)[:,1] auc_score = roc_auc_score(y_test, predictions_test) mlflow.log_param('n_estimators', n_estimators) # Use the area under the ROC curve as a metric. mlflow.log_metric('auc', auc_score) wrappedModel = SklearnModelWrapper(model) # Log the model with a signature that defines the schema of the model's inputs and outputs. # When the model is deployed, this signature will be used to validate inputs. signature = infer_signature(X_train, wrappedModel.predict(None, X_train)) # MLflow contains utilities to create a conda environment used to serve models. # The necessary dependencies are added to a conda.yaml file which is logged along with the model. conda_env = _mlflow_conda_env( additional_conda_deps=None, additional_pip_deps=["cloudpickle=={}".format(cloudpickle.__version__), "scikit-learn=={}".format(sklearn.__version__)], additional_conda_channels=None, ) mlflow.pyfunc.log_model("random_forest_model", python_model=wrappedModel, conda_env=conda_env, signature=signature)
feature_importances = pd.DataFrame(model.feature_importances_, index=X_train.columns.tolist(), columns=['importance']) feature_importances.sort_values('importance', ascending=False)
run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"').iloc[0].run_id
# If you see the error "PERMISSION_DENIED: User does not have any permission level assigned to the registered model", # the cause may be that a model already exists with the name "wine_quality". Try using a different name. model_name = "wine_quality" model_version = mlflow.register_model(f"runs:/{run_id}/random_forest_model", model_name)
from mlflow.tracking import MlflowClient client = MlflowClient() client.transition_model_version_stage( name=model_name, version=model_version.version, stage="Production", )
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production") # Sanity-check: This should match the AUC logged by MLflow print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK from hyperopt.pyll import scope from math import exp import mlflow.xgboost import numpy as np import xgboost as xgb
params = {'max_depth': 2, 'eta': 1, 'objective': 'binary:logistic'} mlflow.xgboost.autolog() with mlflow.start_run(nested=True): train = xgb.DMatrix(data=X_train, label=y_train) test = xgb.DMatrix(data=X_test, label=y_test) # Pass in the test set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric # is no longer improving. booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\ evals=[(test, "test")], early_stopping_rounds=50) predictions_test = booster.predict(test) auc_score = roc_auc_score(y_test, predictions_test) mlflow.log_metric('auc', auc_score) signature = infer_signature(X_train, booster.predict(train)) mlflow.xgboost.log_model(booster, "model", signature=signature)
mlflow.end_run()
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production") print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
# To simulate a new corpus of data, save the existing X_train data to a Delta table. # In the real world, this would be a new batch of data. spark_df = spark.createDataFrame(X_train) # Replace <username> with your username before running this cell. table_path = "dbfs:/mlflowdata/delta/wine_data" # Delete the contents of this path in case this cell has already been run dbutils.fs.rm(table_path, True) spark_df.write.format("delta").save(table_path)
import mlflow.pyfunc apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/production")
# Read the "new data" from Delta new_data = spark.read.format("delta").load(table_path) display(new_data)
from pyspark.sql.functions import struct # Apply the model to the new data udf_inputs = struct(*(X_train.columns.tolist())) new_data = new_data.withColumn( "prediction", apply_model_udf(udf_inputs) ) display(new_data)
new_data.write.format("delta").mode("overwrite").save("/mnt/mlflow/wineoutput") df1 = spark.read.format("delta").load("/mnt/mlflow/wineoutput") display(df1)
Azure DevOps
- перейти к DevOps
- Создать проект
- Создать новую сборку
- Подключиться к репозиторию
- Создайте новый конвейер как azure-pipelines.yml.
- Пример кода доступен в этом репо — https://github.com/balakreshnan/sparkops
- Сначала создайте переменные
clusterid
databricks_token
- Кластер получен из рабочей области Azure Databricks с использованием выходных данных JSON.
- databricks_token — токен личного доступа
- Код конвейера сборки DevOps
trigger:
- main
pool:
vmImage: ubuntu-latest
steps:
- script: echo Hello, world!
displayName: 'Run a one-line script'
- script: |
echo Add other tasks to build, test, and deploy your project.
echo See https://aka.ms/yaml
displayName: 'Run a multi-line script'
- task: UsePythonVersion@0
inputs:
versionSpec: '3.7'
addToPath: true
architecture: 'x64'
- task: CopyFiles@2
inputs:
SourceFolder: 'notebooks'
Contents: '**'
TargetFolder: '$(Build.SourcesDirectory)'
- task: DownloadGitHubRelease@0
inputs:
connection: 'balakreshnan'
userRepository: 'balakreshnan/sparkops'
defaultVersionType: 'latest'
downloadPath: '$(System.ArtifactsDirectory)'
- task: Bash@3
inputs:
targetType: 'inline'
script: |
# Write your commands here
ls -l $(System.ArtifactsDirectory)
- task: configuredatabricks@0
inputs:
url: 'https://adb-xxxxxxxxxxx.x.azuredatabricks.net'
token: '$(databricks_token)'
- task: startcluster@0
inputs:
clusterid: '$(clusterid)'
- task: executenotebook@0
inputs:
notebookPath: '/Users/[email protected]/ML/xgboost-python'
existingClusterId: '$(clusterid)'
- task: executenotebook@0
inputs:
notebookPath: '/Users/[email protected]/ML/pytorch-single-node'
existingClusterId: '$(clusterid)'
- task: executenotebook@0
inputs:
notebookPath: '/Users/[email protected]/ML/mlflowexp'
existingClusterId: '$(clusterid)'
- Проверьте блоки данных Azure, чтобы убедиться, что записные книжки выполнены успешно.
- мы можем использовать Фабрику данных Azure или Azure Synapse Integrate для переобучения модели и оценки партии или автоматизации логического вывода.
Первоначально опубликовано на https://github.com.