CI/CD для обучения модели машинного обучения с помощью многопоточного и пакетного логического вывода

Предпосылка

  • Учетная запись Azure
  • Учетная запись хранения Azure
  • Azure DevOps
  • Azure Databricks
  • Azure DevOps — установка Databricks Market Place
  • Github для хранения кода

Данные

Шаги

  • Сначала создайте блокнот с помощью python
  • Создайте кластер с 8,2ML в качестве среды выполнения.
  • Вот обучение
  • Регистрация модели
  • Используйте модель для пакетной оценки
  • Сохраните его обратно в хранилище как дельту
import pandas as pd

# In the following lines, replace <username> with your username.
white_wine = pd.read_csv("/dbfs/FileStore/shared_uploads/username/winequality_white.csv", sep=';')
red_wine = pd.read_csv("/dbfs/FileStore/shared_uploads/username/winequality_red.csv", sep=';')
red_wine['is_red'] = 1
white_wine['is_red'] = 0

data = pd.concat([red_wine, white_wine], axis=0)

# Remove spaces from column names
data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)
data.head()
  • Показать диаграмму
import seaborn as sns
sns.distplot(data.quality, kde=False)
high_quality = (data.quality >= 7).astype(int)
data.quality = high_quality
import matplotlib.pyplot as plt

dims = (3, 4)

f, axes = plt.subplots(dims[0], dims[1], figsize=(25, 15))
axis_i, axis_j = 0, 0
for col in data.columns:
  if col == 'is_red' or col == 'quality':
    continue # Box plots cannot be used on indicator variables
  sns.boxplot(x=high_quality, y=data[col], ax=axes[axis_i, axis_j])
  axis_j += 1
  if axis_j == dims[1]:
    axis_i += 1
    axis_j = 0
data.isna().any()
from sklearn.model_selection import train_test_split

train, test = train_test_split(data, random_state=123)
X_train = train.drop(["quality"], axis=1)
X_test = test.drop(["quality"], axis=1)
y_train = train.quality
y_test = test.quality
import mlflow
import mlflow.pyfunc
import mlflow.sklearn
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env

# The predict method of sklearn's RandomForestClassifier returns a binary classification (0 or 1). 
# The following code creates a wrapper function, SklearnModelWrapper, that uses 
# the predict_proba method to return the probability that the observation belongs to each class. 

class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
  def __init__(self, model):
    self.model = model
    
  def predict(self, context, model_input):
    return self.model.predict_proba(model_input)[:,1]

experiment_name = "/Shared/wine_experiment/"
mlflow.set_experiment(experiment_name)
# mlflow.start_run creates a new MLflow run to track the performance of this model. 
# Within the context, you call mlflow.log_param to keep track of the parameters used, and
# mlflow.log_metric to record metrics like accuracy.
with mlflow.start_run(run_name='untuned_random_forest'):
  n_estimators = 10
  model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(123))
  model.fit(X_train, y_train)

  # predict_proba returns [prob_negative, prob_positive], so slice the output with [:, 1]
  predictions_test = model.predict_proba(X_test)[:,1]
  auc_score = roc_auc_score(y_test, predictions_test)
  mlflow.log_param('n_estimators', n_estimators)
  # Use the area under the ROC curve as a metric.
  mlflow.log_metric('auc', auc_score)
  wrappedModel = SklearnModelWrapper(model)
  # Log the model with a signature that defines the schema of the model's inputs and outputs. 
  # When the model is deployed, this signature will be used to validate inputs.
  signature = infer_signature(X_train, wrappedModel.predict(None, X_train))
  
  # MLflow contains utilities to create a conda environment used to serve models.
  # The necessary dependencies are added to a conda.yaml file which is logged along with the model.
  conda_env =  _mlflow_conda_env(
        additional_conda_deps=None,
        additional_pip_deps=["cloudpickle=={}".format(cloudpickle.__version__), "scikit-learn=={}".format(sklearn.__version__)],
        additional_conda_channels=None,
    )
  mlflow.pyfunc.log_model("random_forest_model", python_model=wrappedModel, conda_env=conda_env, signature=signature)
feature_importances = pd.DataFrame(model.feature_importances_, index=X_train.columns.tolist(), columns=['importance'])
feature_importances.sort_values('importance', ascending=False)
run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"').iloc[0].run_id
# If you see the error "PERMISSION_DENIED: User does not have any permission level assigned to the registered model", 
# the cause may be that a model already exists with the name "wine_quality". Try using a different name.
model_name = "wine_quality"
model_version = mlflow.register_model(f"runs:/{run_id}/random_forest_model", model_name)
from mlflow.tracking import MlflowClient

client = MlflowClient()
client.transition_model_version_stage(
  name=model_name,
  version=model_version.version,
  stage="Production",
)
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")

# Sanity-check: This should match the AUC logged by MLflow
print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb
params = {'max_depth': 2, 'eta': 1, 'objective': 'binary:logistic'}
mlflow.xgboost.autolog()
with mlflow.start_run(nested=True):
  train = xgb.DMatrix(data=X_train, label=y_train)
  test = xgb.DMatrix(data=X_test, label=y_test)
  # Pass in the test set so xgb can track an evaluation metric. XGBoost terminates training when the evaluation metric
  # is no longer improving.
  booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\
                      evals=[(test, "test")], early_stopping_rounds=50)
  predictions_test = booster.predict(test)
  auc_score = roc_auc_score(y_test, predictions_test)
  mlflow.log_metric('auc', auc_score)

  signature = infer_signature(X_train, booster.predict(train))
  mlflow.xgboost.log_model(booster, "model", signature=signature)
mlflow.end_run()
model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")
print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
# To simulate a new corpus of data, save the existing X_train data to a Delta table. 
# In the real world, this would be a new batch of data.
spark_df = spark.createDataFrame(X_train)
# Replace <username> with your username before running this cell.
table_path = "dbfs:/mlflowdata/delta/wine_data"
# Delete the contents of this path in case this cell has already been run
dbutils.fs.rm(table_path, True)
spark_df.write.format("delta").save(table_path)
import mlflow.pyfunc

apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/production")
# Read the "new data" from Delta
new_data = spark.read.format("delta").load(table_path)
display(new_data)
from pyspark.sql.functions import struct

# Apply the model to the new data
udf_inputs = struct(*(X_train.columns.tolist()))

new_data = new_data.withColumn(
  "prediction",
  apply_model_udf(udf_inputs)
)
display(new_data)
new_data.write.format("delta").mode("overwrite").save("/mnt/mlflow/wineoutput")
df1 = spark.read.format("delta").load("/mnt/mlflow/wineoutput")
display(df1)

Azure DevOps

  • перейти к DevOps
  • Создать проект
  • Создать новую сборку
  • Подключиться к репозиторию
  • Создайте новый конвейер как azure-pipelines.yml.
  • Пример кода доступен в этом репо — https://github.com/balakreshnan/sparkops
  • Сначала создайте переменные
clusterid 
databricks_token
  • Кластер получен из рабочей области Azure Databricks с использованием выходных данных JSON.
  • databricks_token — токен личного доступа
  • Код конвейера сборки DevOps
trigger:
- main

pool:
  vmImage: ubuntu-latest

steps:
- script: echo Hello, world!
  displayName: 'Run a one-line script'

- script: |
    echo Add other tasks to build, test, and deploy your project.
    echo See https://aka.ms/yaml
  displayName: 'Run a multi-line script'

- task: UsePythonVersion@0
  inputs:
    versionSpec: '3.7'
    addToPath: true
    architecture: 'x64'

- task: CopyFiles@2
  inputs:
    SourceFolder: 'notebooks'
    Contents: '**'
    TargetFolder: '$(Build.SourcesDirectory)'

- task: DownloadGitHubRelease@0
  inputs:
    connection: 'balakreshnan'
    userRepository: 'balakreshnan/sparkops'
    defaultVersionType: 'latest'
    downloadPath: '$(System.ArtifactsDirectory)'

- task: Bash@3
  inputs:
    targetType: 'inline'
    script: |
      # Write your commands here
      
      ls -l $(System.ArtifactsDirectory)

- task: configuredatabricks@0
  inputs:
    url: 'https://adb-xxxxxxxxxxx.x.azuredatabricks.net'
    token: '$(databricks_token)'

- task: startcluster@0
  inputs:
    clusterid: '$(clusterid)'

- task: executenotebook@0
  inputs:
    notebookPath: '/Users/[email protected]/ML/xgboost-python'
    existingClusterId: '$(clusterid)'

- task: executenotebook@0
  inputs:
    notebookPath: '/Users/[email protected]/ML/pytorch-single-node'
    existingClusterId: '$(clusterid)'


- task: executenotebook@0
  inputs:
    notebookPath: '/Users/[email protected]/ML/mlflowexp'
    existingClusterId: '$(clusterid)'

  • Проверьте блоки данных Azure, чтобы убедиться, что записные книжки выполнены успешно.
  • мы можем использовать Фабрику данных Azure или Azure Synapse Integrate для переобучения модели и оценки партии или автоматизации логического вывода.

Первоначально опубликовано на https://github.com.