В этой статье мы продолжим создание нашего пайплайна MLOps с помощью AWS SageMaker Pipelines. Важно помнить, что мы работали с набором данных, чтобы предсказать качество вина, и мы сохранили эти данные как группу признаков. Эта группа функций использовалась в качестве источника для нашей модели машинного обучения и была загружена в конвейер на этапе make_dataset. Кроме того, мы настроили первые два шага нашего конвейера с помощью make_dataset и шагов обучения. Вернемся к следующим шагам.

3. Оценка

Прежде всего, мы создаем файл с именем «evaluation.py», и его содержимым будет следующий код:

import subprocess
import warnings
import sys
import os
import argparse
from sklearn.exceptions import DataConversionWarning

def install():
    print('install dependencies')
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "/opt/ml/processing/input/requirements/requirements.txt"])#, shell=True, check=True])
    return "instalation ended"


def evaluation():
    
    import json
    import pathlib
    import pickle
    import tarfile

    import joblib
    from joblib import dump, load
    import numpy as np
    import pandas as pd
    import awswrangler as wr

    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import f1_score
    
    model_path = os.path.join('/opt/ml/processing/input/model', 'model.tar.gz')
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    print('reading saved model ...-')    
    model = load(open("model.joblib", "rb"))    

    print('reading X_test file...-')   
    X_test = wr.s3.read_csv(path=f's3://sagemaker-us-east-1-XXXXXXX/tests/data/test.csv')
     
    print('reading y_test ...-')   
    y_test = X_test.pop('quality')
    
    print('Predictions ...-')   
    predictions = model.predict(X_test)

    print('computing evaluation metric ...-')   
    f1 = f1_score(y_test, predictions, average='macro')
    report_dict = {
        "classification_metrics": {
        "f1": {"value": f1},
        },
    }

    output_dir = "/opt/ml/processing/output/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))
    
    return "evaluation done"

if __name__ == "__main__":
    install()
    evaluation()

В этом случае мы вычисляем балл F1 как метрику, результат сохраняется в файле json.

Теперь определим параметры для оценки:

report_metrics_output_path=f's3://{default_bucket}/tests/wines/evaluacion/'
evaluation_output_path=ParameterString(name="EvaluationOutputPath")

Созданный нами файл Json с оценочной метрикой будет загружен в AWS S3 по пути, указанному в report_metrics_output_path.

После этого мы настраиваем SKLearn Processor для запуска нашего скрипта оценки и PropertyFile для указания EvaluationReport:

# Processing step for evaluation
sklearn_processor_evaluate = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    role=role,
    sagemaker_session=sagemaker_session
)
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

Теперь мы определяем шаг оценки, мы используем следующий код:

step_eval= ProcessingStep(
    name=f"eval-wines", 
    processor=sklearn_processor_evaluate,
    inputs=[
        ProcessingInput(
            input_name='model',
            source= step_train.properties.ModelArtifacts.S3ModelArtifacts, 
            destination='/opt/ml/processing/input/model'),
        ProcessingInput(
            input_name = 'requirements',
            source = 'requirements.txt', # or S3 URI where file is loaded
            destination = '/opt/ml/processing/input/requirements')
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation', 
            source='/opt/ml/processing/output/evaluation',
            destination=evaluation_output_path
        )],
    code = 'evaluation.py', # or S3 URI with evaluation file
    property_files=[evaluation_report],
    # we can pass parameters to execution
    #job_arguments=[], 
    depends_on=[f'train-wines']
)

Обратите внимание, что этот шаг зависит от шага обучения, поскольку должна быть создана модель.

4. Зарегистрируйте модель

Этот шаг состоит из регистрации сгенерированной модели в реестре моделей. Реестр моделей — это хранилище моделей, содержащее каждую развернутую версию модели. Этот шаг будет запущен, если наша модель пройдет целевую метрику, которую мы определим позже на шаге условия.

Мы определяем параметры для выполнения шага и метрик модели, файла, загруженного в S3 с метриками модели:

register_model_inference_instance_type = ParameterString(name="RegisterModelInferenceInstanceType",default_value="ml.m5.large")
register_model_metrics=ParameterString(name="RegisterModelMetrics")
register_model_package_group_name = ParameterString(name="RegisterModelPackageGroupName")
report_metrics_complete_path=f"s3://{default_bucket}/tests/wines/evaluacion/evaluation.json"
model_package_name="package-model-wines-pipeline"
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=register_model_metrics,
        content_type="application/json",
    )
)

Теперь мы настраиваем шаг, передаем модель, сгенерированную в train_step, и определяем экземпляр, необходимый для регистрации модели:

step_register = RegisterModel(
    name=f"register-wines",
    estimator=sklearn_estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[register_model_inference_instance_type],
    transform_instances=[register_model_inference_instance_type],
    model_package_group_name=register_model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    depends_on=[f"eval-wines"]
)

5. Создать модель

Нам нужно указать uri образа, этот образ содержит версию фреймворка, необходимую для развертывания нашей модели, и нам нужно определить тип экземпляра для его развертывания.

image_uri = sagemaker.image_uris.retrieve(
    framework="sklearn",
    region=region,
    version=framework_version,
    py_version="py3",
    instance_type=register_model_inference_instance_type,
)

После этого мы создаем модель, передавая модель, созданную на этапе обучения, и переменные среды, необходимые для развертывания модели. Слишком важно знать, что на этапе обучения нам нужно загрузить sourcedir.tar.gz на S3. Здесь в SAGEMAKER_SUBMIT_DIRECTORY мы передаем этот URI, а в SAGEMAKER_PROGRAM мы передаем имя нашего сценария обучения.

model = Model(
    image_uri=image_uri,
    sagemaker_session=sagemaker_session,
    role=role,
    env={'SAGEMAKER_CONTAINER_LOG_LEVEL':'20',
        'SAGEMAKER_PROGRAM': 'train.py',
        'SAGEMAKER_REGION': region,
        'SAGEMAKER_SUBMIT_DIRECTORY': f's3://{default_bucket}/tests/wines/train/sourcedir.tar.gz'},
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts
)

Мы создаем ввод для экземпляра модели, чтобы зарегистрировать модель, и мы создаем «шаг создания модели»:

inputs = CreateModelInput(
    instance_type=register_model_inference_instance_type
)
step_create_model = CreateModelStep(
    name=f'create-model-wines',
    model=model,
    inputs=inputs,
    depends_on=[f"register-wines"]
)

6. Шаг условия

Этот шаг состоит в проверке соответствия метрики модели целевому значению. Если метрика не соответствует, модель не будет зарегистрирована в реестре моделей.

Мы определяем условие для модели, нам нужно прочитать отчет об оценке, загруженный в S3. В этом случае мы вычисляем относительно 0,1, это нелепая метрика, но она проиллюстрирует смысл этого шага:

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="classification_metrics.f1.value",
    ),
    right=0.1,
)

Теперь указываем шаг:

step_cond = ConditionStep(
    name=f"cond-wines",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model],
    else_steps=[]
)

Обратите внимание на if_steps, это список шагов, которые мы выполним, если наша метрика будет соответствовать.

После этого мы должны построить конвейер с шагами, которые мы настраиваем:

pipeline_name = f"ml-wines-train"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        processing_outputs_path,
        training_instance_count,
        training_instance_type,
        training_output_path,
        evaluation_output_path,
        register_model_inference_instance_type,
        register_model_package_group_name,
        register_model_metrics
    ],
    steps=[step_make_dataset, step_train, step_eval, step_cond],
)
# create the pipeline if not exists
pipeline.upsert(role_arn=role)

Наконец, мы выполняем параметры передачи конвейера, которые мы определили для выполнения:

execution = pipeline.start(
    parameters=dict(
        ProcessingOutputsPath=dataset_outputs_path,
        TrainingOutputPath=model_output_path,
        EvaluationOutputPath=report_metrics_output_path,
        RegisterModelPackageGroupName=model_package_name,
        RegisterModelMetrics=report_metrics_complete_path
    )
)

Выполнение конвейера

Теперь мы можем перейти в SageMaker Studio и проверить, как конвейер был развернут и выполнен:

Наша модель должна быть доступна в Model Registry:

Выводы

Когда модель готова к использованию, нам нужно выбрать способ, как мы будем делать выводы. Вы можете выбрать между пакетным выводом или интерактивным выводом. Разница в том, что в первом у вас есть много записей для вывода, тогда как во втором вы можете отправлять записи для вывода, например, когда у вас есть интеграция с приложениями. В этом примере мы будем работать с онлайн-выводом.

Нам нужно развернуть и конечную точку, чтобы сопоставить модель в реестре моделей, но для создания конечной точки сначала необходимо настроить конфигурацию конечной точки, чтобы указать, как модель будет развернута. Затем давайте импортируем библиотеки для логического вывода:

import json
import boto3
from sagemaker.predictor import csv_serializer,RealTimePredictor,Predictor
from sagemaker.serializers import CSVSerializer, NumpySerializer
client = boto3.client("sagemaker")

Теперь мы определяем конфигурацию конечной точки:

# Crear endpoint configuration tipo serverless
response = client.create_endpoint_config(
   EndpointConfigName="endpoint-config-wines",
   ProductionVariants=[
        {
            "ModelName": "pipelines-skbl3m2i5b09-create-model-wines-W9dRsVlfjW", 
            "VariantName": "AllTraffic",
            "ServerlessConfig": {
                "MemorySizeInMB": 1024,
                "MaxConcurrency": 5
            }
        } 
    ]
)

Обратите внимание, что мы передаем имя модели, которое можно увидеть в SageMaker Service -> Inference -> Model, и определяем бессерверную опцию для выполнения как лямбда-функция.

После этого мы развертываем конечную точку с помощью следующего кода:

response = client.create_endpoint(
    EndpointName="endpoint-wines",
    EndpointConfigName="endpoint-config-wines"
)

Мы определяем запись для тестирования модели и создаем предиктор и сериализатор:

data_wines = "6.7,0.26,0.29,7.10,0.036,28.0,100.0,0.99534,3.08,0.36,9.3"
model_predictor = Predictor('endpoint-wines')
model_predictor.serializer = CSVSerializer()

Наконец, мы делаем прогнозы:

model_predictor.predict(data_wines)

########### RESPONSE ########
b'{"instances": [{"features": 9}]}'
#############################

Вы также можете протестировать свою конечную точку с помощью среды выполнения boto3:

runtime= boto3.client('runtime.sagemaker')
data = {"data": data_wines}
response = runtime.invoke_endpoint(EndpointName='endpoint-wines',
                                   ContentType='text/csv',
                                  Body=data['data'],
                                  Accept="application/json")

Вы можете преобразовать вывод:

result = response['Body'].read().decode()
result = json.loads(result)
result


######### RESPONSE ##########
{'instances': [{'features': 9}]}
#############################

Вот мы и закончили. Надеюсь, вам понравится этот контент

Очистка окружающей среды

Конечная точка очистки:

client.delete_endpoint(EndpointName='endpoint-wines')
client.delete_endpoint_config(EndpointConfigName='endpoint-config-wines')

Очистка трубопровода:

response = client.delete_pipeline(
    PipelineName='ml-wines-train'
)

Удалить модель:

response = client.delete_model(
    ModelName='pipelines-skbl3m2i5b09-create-model-wines-W9dRsVlfjW'
)

Удалить пакет модели и группу пакетов модели:

response = client.delete_model_package(
    ModelPackageName='arn:aws:sagemaker:us-east-1:544644514035:model-package/package-model-wines-pipeline/1'
)
response = client.delete_model_package_group(
    ModelPackageGroupName='package-model-wines-pipeline'
)