Перемещение данных, обработка данных, обучение модели с помощью Azure Synapse Analytics Workspace
Унифицированная единая платформа для управления жизненным циклом Data и Data Science для повышения производительности
Примечание
- Эта статья предназначена для демонстрации функциональности
Предпосылка
- Учетная запись Azure
- Рабочая область Azure Synapse Analytics
- База данных SQL Azure
- Хранилище Azure
- Загрузите данные Covid19 из kaggel и импортируйте в Azure SQL в качестве смоделированных данных из внешнего источника.
- Создайте одну таблицу как dbo.covid19data и загрузите данные
- Приведенный выше поток архитектуры состоит из 3 компонентов.
- Копировать действие для перемещения данных из внешнего источника и переноса в необработанную или входную зону.
- Далее следует действие потока данных для перетаскивания ETL/ELT для преобразования или обработки данных и перехода к окончательному варианту.
- Блокнот для запуска автоматизированного ML sdk с использованием spark для запуска модели машинного обучения
- Мы используем регрессию для прогнозирования смертей, происходящих из-за covid 19.
- Войдите в рабочую область Azure Synapse.
- Создать конвейер/интегрировать
- Перетаскивание копирования
- для источника выберите указанный выше SQL-сервер — (необходимо создать связанные службы)
- Затем создайте поток данных для обработки данных.
- Для примера я использую дельту, чтобы упростить CDC.
- Создайте источник с назначением действий копирования
- Мы можем возобновить связанную службу назначения действия по копированию самостоятельно.
- Теперь перетащите выделение
- Выбрать все столбцы
- Включите отладку и просмотрите результаты. Этот набор данных с открытым исходным кодом, поэтому нет PII
- Выберите столбцы
- См. отладку для проверки обработки данных.
- Теперь к машинному обучению
- Создать блокнот
- Получите доступ к вышеуказанному набору данных стока в качестве входных данных для моделирования.
- Создать блокнот
Код
- Загрузить данные
%%pyspark
df = spark.read.load('abfss://[email protected]/covid19aggroutput/*.parquet', format='parquet')
display(df.limit(10))
- Схема печати
df.printSchema
- импорт необходимо
from pyspark.sql.functions import *
from pyspark.sql import *
- преобразовать дату в тип данных даты
df1 = df.withColumn("Date", to_date("ObservationDate", "MM/dd/yyyy"))
display(df1)
- Создать новые столбцы
df2 = df1.withColumn("year", year(col("Date"))).withColumn("month", month(col("Date"))).withColumn("day", dayofmonth(col("Date")))
- Далее идет импорт библиотеки ML.
- получить только необходимые столбцы
dffinal = df2[["year","month", "day", "Confirmed", "Deaths", "Recovered"]]
- Spark ML начинается здесь
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ["year","month", "day", "Confirmed", "Recovered"], outputCol = 'features')
mldf = vectorAssembler.transform(dffinal)
mldf = mldf.select(['features', 'Deaths'])
mldf.show(3)
- Разделите набор обучающих и тестовых данных
splits = mldf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
- Настроить модель
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='Deaths', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
- Распечатайте точность модели
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
- Оценить с помощью тестовых данных
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","Deaths","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="Deaths",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
- отображать результаты
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)
- теперь запускать прогнозы
predictions = lr_model.transform(test_df)
predictions.select("prediction","Deaths","features").show()
- помните, что вывод модели не является нашей целью в этом уроке. Просто чтобы показать вам процесс.
- Сохраните конвейер и запустите отладку
Первоначально опубликовано на https://github.com.