Узнайте о снежном парке Снежинки, помогая грузовикам с едой найти лучшие места для парковки для предстоящих смен.

Мы рассмотрим использование Snowpark для следующих основных компонентов рабочего процесса обработки данных:

  • Доступ к данным
  • Изучение данных
  • Разработка функций
  • Обучение модели
  • Развертывание модели
  • Вывод модели

Чтобы пройти через рабочий процесс обработки данных, мы будем помогать (поддельным) грузовикам с едой находить лучшие места для парковки. Для этого мы построили модель, которая прогнозирует продажи всех парковок на ближайшие утренние и вечерние смены.

Весь код доступен в этом Пошаговом руководстве по началу работы.

Что такое Snowpark для Python?

Snowpark позволяет разработчикам запрашивать и развертывать функции на Python с помощью API и программных конструкций в стиле DataFrame, которые работают на эластичном движке Snowflake.

Что такое фрейм данных Snowpark?

DataFrame представляет реляционный набор данных, который оценивается лениво; он выполняется только тогда, когда запускается определенное действие.

Он позволяет пользователям работать в среде Python, выполнять цепные преобразования и использовать встроенные функции Snowflake без перемещения данных из Snowflake. Преобразования Snowpark DataFrame транслируются и выполняются как SQL в Snowflake. Это означает, что запросы выполняются быстро и оптимизированы.

Например, серия преобразований Snowpark DataFrame:

# Select
location_df = snowpark_df.select("date", "shift", "shift_sales", "location_id", "city")

# Filter
location_df = location_df.filter(col("location_id") == 1135)

# Sort
location_df = location_df.order_by(["date", "shift"], ascending=[0, 0])

# Display
location_df.show(n=20)

Будет выполняться как SQL в Snowflake (как показано в тексте SQL в истории запросов Snowflake):

Что такое хранимые процедуры и пользовательские функции Python?

Хранимые процедуры и определяемые пользователем функции — это механизмы для развертывания кода Python, часто с использованием пакетов Python, в Snowflake для выполнения в безопасной среде песочницы на платформе Snowflake.

Хранимые процедуры хорошо подходят для обучения, поскольку они могут считывать данные, хранить всю таблицу в памяти для поиска шаблонов и записывать файлы (например, файлы моделей) обратно в базу данных Snowflake.

Пользовательские функции хорошо подходят для логических выводов, поскольку они возвращают одно значение для каждой строки, переданной пользовательской функции. Из-за этого их можно легко распределять для получения быстрых результатов.

1. Доступ к данным

Создание кадра данных Snowpark

Мы подключаемся к Snowflake, создавая сеанс с учетными данными нашей учетной записи Snowflake. После подключения мы используем session.table, чтобы указать, с какой таблицей или представлением Snowflake мы хотим работать в качестве DataFrame.

# Create Snowpark session
session = Session.builder.configs(connection_parameters).create()

# Create Snowpark DataFrame
snowpark_df = session.table("shift_sales_v")

2. Изучение данных

При использовании Snowpark для изучения данных общий шаблон заключается в использовании Snowpark для манипулирования данными, а затем переноса сводной таблицы в нашу среду Python для визуализации. При этом используются собственная производительность и масштабируемость Snowflake для агрегирования больших наборов данных, а также простая передача агрегированных данных в клиентскую среду для визуализации.

В этом примере мы группируем данные о сменных продажах по городам и рассчитываем средние сменные продажи для каждого города. Затем сортируем по среднему. Чтобы создать гистограмму, мы используем to_pandas(), чтобы перенести отсортированную сводную таблицу в нашу среду Python, а затем построить горизонтальную гистограмму.

# Group by city and average shift sales
analysis_df = snowpark_df.group_by("city").agg(mean("shift_sales").alias("avg_shift_sales"))

# Sort by average shift sales
analysis_df = analysis_df.sort("avg_shift_sales", ascending=True)

# Pull to pandas and plot
analysis_df.to_pandas().plot.barh(x="CITY", y="AVG_SHIFT_SALES")

3. Разработка функций

Помимо возможности Snowflake работать с масштабируемыми данными, синтаксис Snowpark DataFrame упрощает преобразование в цепочку и создание новых столбцов. Мы будем использовать Snowpark для создания новой функции, выполнения ряда преобразований существующих столбцов и разделения данных для обучения и тестирования.

Создание скользящего агрегатного объекта

# Create a window that paritions on location and includes all preceding rows
window_by_location_all_days = (
    Window.partition_by("location_id", "shift")
    .order_by("date")
    .rows_between(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW - 1)
)

# Average shift sales across the window
snowpark_df = snowpark_df.with_column(
    "avg_location_shift_sales", 
    avg("shift_sales").over(window_by_location_all_days)
)

Вменение, кодирование и удаление столбцов

# Impute missing values
snowpark_df = snowpark_df.fillna(value=0, subset=["avg_location_shift_sales"])

# Encode categorical column
snowpark_df = snowpark_df.with_column("shift", iff(col("shift") == "AM", 1, 0))

# Drop columns
snowpark_df = snowpark_df.drop("location_id", "city", "date")

Разделить данные на обучение и тестирование

train_snowpark_df, test_snowpark_df = snowpark_df.randomSplit([0.8, 0.2])

Для всех этих преобразований данные из Snowflake не извлекались и копии данных не создавались.

4. Модельное обучение

Мы создадим хранимую процедуру Snowflake для обучения модели. Первым шагом является создание функции, которая выполняет обучение модели и сохраняет файл модели. Второй шаг — зарегистрировать эту функцию в Snowflake как хранимую процедуру.

Шаг 1. Создайте функцию для обучения модели

Эта функция обучает модель линейной регрессии Scikit-learn и сохраняет модель на этапе. Линейная регрессия находит линию, которая лучше всего соответствует точкам данных, используемым при обучении. Затем мы используем эту линию как оценку того, где будут выходные значения для будущих сценариев.

Обучение будет использовать историю продаж за смену и особенности наших данных, чтобы прогнозировать будущие продажи за смену в местах, где могут парковаться наши грузовики с едой.

  • Входные данные: имя обучающей таблицы в Snowflake, имена столбцов функций, имена целевых столбцов, имя файла для сохраненной модели.
  • Выходные данные: веса признаков обученной модели.
def train_linreg(
    session: Session,
    training_table: str,
    feature_cols: list,
    target_col: str,
    model_name: str,
) -> T.Variant:

    # Import packages
    from sklearn.linear_model import LinearRegression
    from joblib import dump

    # Get training data
    df = session.table(training_table).to_pandas()

    # Set inputs X and outputs y
    X = df[feature_cols]
    y = df[target_col]

    # Train model
    model = LinearRegression().fit(X, y)

    # Get feature weights
    feature_weights = pd.DataFrame({"Feature": model.feature_names_in_, "Weight": model.coef_}).to_dict()

    # Save model
    dump(model, "/tmp/" + model_name)
    session.file.put(
        "/tmp/" + model_name,
        "@MODEL_STAGE",
        auto_compress=False,
        overwrite=True
    )

    # Return feature contributions
    return feature_weights

Шаг 2. Зарегистрируйте функцию в Snowflake

Чтобы зарегистрировать функцию в Snowflake как хранимую процедуру, нам нужно указать, какие пакеты Python требуются в функции. Здесь указываем:

  • Сноупарк
  • Scikit-learn (для обучения нашей модели)
  • Joblib (для создания файла модели)

Scikit-learn — популярная библиотека Python для машинного обучения. Мы сможем использовать его функциональность в Snowflake в нашей развернутой хранимой процедуре.

train_linreg_snowflake = session.sproc.register(
    func=train_linreg,
    name="sproc_train_linreg",
    is_permanent=True,
    replace=True,
    stage_location="@MODEL_STAGE",
    packages=["snowflake-snowpark-python", "scikit-learn", "joblib"]
)

Обучите модель

После регистрации мы можем обучить нашу модель в Snowflake, вызвав нашу хранимую процедуру. Если требуется больше памяти, можно использовать Оптимизированный склад Snowpark. В этом случае достаточно стандартного склада Snowflake.

feature_weights = train_linreg_snowflake(
    session, training_table, feature_cols, target_col, model_name
)

5. Развертывание модели

Мы развернем модель как определяемую пользователем функцию в Snowflake. Это сделает модель доступной в централизованном месте, куда поступают новые данные. Это упростит получение обновленных прогнозов по сменным продажам. Как и при создании хранимой процедуры, мы сначала определяем функцию, которая делает вывод, а затем регистрируем ее в Snowflake.

Шаг 1. Создайте функцию для вывода модели

Эта функция загружает сохраненную модель и прогнозирует продажи за смену на основе входных функций.

  • Входные данные: функции
  • Результаты: прогноз продаж за смену.

Поскольку мы указали вход как кадр данных pandas, а вывод как серию pandas, определяемая пользователем функция будет выполняться пакетами, а не по одной строке за раз. Во многих случаях это сокращает общее время выполнения запроса, вызывающего пользовательскую функцию.

Мы также используем библиотеку Python cachetools для кэширования модели, возвращаемой функцией load_model. Это гарантирует, что модель загружается один раз для каждого вызова определяемой пользователем функции, а не для каждой строки или пакета данных.

# Function to load the model from file and cache the result
@cachetools.cached(cache={})
def load_model(filename):
    
    # Import packages
    import sys
    import os
    import joblib
    
    # Get the import directory where the model file is stored
    import_dir = sys._xoptions.get("snowflake_import_directory")
    
    # Get the import directory where the model file is stored
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

# Function to predict shift sales
def linreg_predict_location_sales(X: pd.DataFrame) -> pd.Series:
    
    # Load the model
    model = load_model("linreg_location_sales_model.sav")

    # Get predictions
    predictions = model.predict(X)

    # Return rounded predictions
    return predictions.round(2)

Шаг 2. Зарегистрируйте функцию в Snowflake

Чтобы зарегистрировать функцию в Snowflake как определяемую пользователем функцию, мы указываем необходимые пакеты Python. Здесь указываем:

  • Scikit-learn (для прогнозирования)
  • Joblib (для загрузки модели из файла)
  • Cachetools (для кэширования загруженной модели)

Кроме того, мы указываем, какие файлы импортировать с помощью пользовательской функции. Здесь мы указываем наш файл обученной модели как импорт.

session.udf.register(
    func=linreg_predict_location_sales,
    name="udf_linreg_predict_location_sales",
    stage_location="@MODEL_STAGE",
    input_types=[T.FloatType()] * len(feature_cols),
    return_type=T.FloatType(),
    replace=True,
    is_permanent=True,
    imports=["@MODEL_STAGE/linreg_location_sales_model.sav"],
    packages=["scikit-learn", "joblib", "cachetools"]
)

6. Вывод модели

Теперь, когда наша определяемая пользователем функция развернута, мы можем использовать ее в Snowflake для наших данных, чтобы получить прогнозы продаж по сменам.

Увеличение масштаба вычислений Snowflake

Мы можем выполнять вывод распределенной модели по узлам нашего хранилища Snowflake путем масштабирования до многоузлового хранилища. При масштабировании до среднего хранилища обработка будет распределяться по всем четырем узлам хранилища вместо однопоточного выполнения на хранилище x-small с 1 узлом. До этого момента в нашем рабочем процессе обработки данных не требовалось среднее хранилище, но теперь мы можем легко настроить наше хранилище в соответствии с вычислительными требованиями для этой части рабочего процесса.

session.sql("ALTER WAREHOUSE tasty_dsci_wh SET WAREHOUSE_SIZE = MEDIUM").collect()

Вызов функции вывода

Мы можем вызвать определяемую пользователем функцию в операторе select с помощью функции call_udf.

predictions_df = test_snowpark_df.select(
    "shift_sales",
    call_udf("udf_linreg_predict_location_sales", [col(c) for c in feature_cols]).alias("prediction")
)

стримлит

Мы использовали эту же определяемую пользователем функцию для включения приложения Streamlit (показано ниже). Приложение позволяет водителям грузовиков перемещаться в свой город и видеть на карте предполагаемые продажи за смену. Streamlit (приобретена Snowflake в марте 2022 г.) – это библиотека Python, упрощающая создание и совместное использование пользовательских веб-приложений.

Краткое содержание

Основные преимущества использования Snowpark для науки о данных

  • Эффективная работа с данными в масштабе
  • Никаких перемещений или копий данных
  • Использование масштабируемых вычислений Snowflake
  • Управление снежинкой сохраняется
  • Централизованное развертывание функций

Попробуйте!

Если вы хотите опробовать сквозной рабочий процесс, вы можете следовать этому пошаговому руководству по быстрому запуску, чтобы перейти от получения данных из Snowflake Data Marketplace, обучения модели к созданию приложения Streamlit.