Перемещение данных, обработка данных, обучение модели с помощью Azure Synapse Analytics Workspace

Унифицированная единая платформа для управления жизненным циклом Data и Data Science для повышения производительности

Примечание

  • Эта статья предназначена для демонстрации функциональности

Предпосылка

  • Учетная запись Azure
  • Рабочая область Azure Synapse Analytics
  • База данных SQL Azure
  • Хранилище Azure
  • Загрузите данные Covid19 из kaggel и импортируйте в Azure SQL в качестве смоделированных данных из внешнего источника.
  • Создайте одну таблицу как dbo.covid19data и загрузите данные

  • Приведенный выше поток архитектуры состоит из 3 компонентов.
  • Копировать действие для перемещения данных из внешнего источника и переноса в необработанную или входную зону.
  • Далее следует действие потока данных для перетаскивания ETL/ELT для преобразования или обработки данных и перехода к окончательному варианту.
  • Блокнот для запуска автоматизированного ML sdk с использованием spark для запуска модели машинного обучения
  • Мы используем регрессию для прогнозирования смертей, происходящих из-за covid 19.
  • Войдите в рабочую область Azure Synapse.
  • Создать конвейер/интегрировать
  • Перетаскивание копирования
  • для источника выберите указанный выше SQL-сервер — (необходимо создать связанные службы)

  • Затем создайте поток данных для обработки данных.
  • Для примера я использую дельту, чтобы упростить CDC.
  • Создайте источник с назначением действий копирования

  • Мы можем возобновить связанную службу назначения действия по копированию самостоятельно.
  • Теперь перетащите выделение

  • Выбрать все столбцы
  • Включите отладку и просмотрите результаты. Этот набор данных с открытым исходным кодом, поэтому нет PII

  • Выберите столбцы
  • См. отладку для проверки обработки данных.

  • Теперь к машинному обучению
  • Создать блокнот
  • Получите доступ к вышеуказанному набору данных стока в качестве входных данных для моделирования.
  • Создать блокнот

Код

  • Загрузить данные
%%pyspark
df = spark.read.load('abfss://[email protected]/covid19aggroutput/*.parquet', format='parquet')
display(df.limit(10))

  • Схема печати
df.printSchema
  • импорт необходимо
from pyspark.sql.functions import *
from pyspark.sql import *
  • преобразовать дату в тип данных даты
df1 = df.withColumn("Date", to_date("ObservationDate", "MM/dd/yyyy")) 
display(df1)

  • Создать новые столбцы
df2 = df1.withColumn("year", year(col("Date"))).withColumn("month", month(col("Date"))).withColumn("day", dayofmonth(col("Date")))

  • Далее идет импорт библиотеки ML.
  • получить только необходимые столбцы
dffinal = df2[["year","month", "day", "Confirmed", "Deaths", "Recovered"]]
  • Spark ML начинается здесь
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ["year","month", "day", "Confirmed", "Recovered"], outputCol = 'features')
mldf = vectorAssembler.transform(dffinal)
mldf = mldf.select(['features', 'Deaths'])
mldf.show(3)
  • Разделите набор обучающих и тестовых данных
splits = mldf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
  • Настроить модель
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Deaths', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
  • Распечатайте точность модели
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
  • Оценить с помощью тестовых данных
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","Deaths","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="Deaths",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
  • отображать результаты
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)
  • теперь запускать прогнозы
predictions = lr_model.transform(test_df)
predictions.select("prediction","Deaths","features").show()
  • помните, что вывод модели не является нашей целью в этом уроке. Просто чтобы показать вам процесс.
  • Сохраните конвейер и запустите отладку

Первоначально опубликовано на https://github.com.