Введение в управляемый Spark с помощью служб машинного обучения Azure

Предпосылки

  • Учетная запись Azure
  • Служба машинного обучения Azure
  • Набор данных Титаник

Шаги управляемой искры

  • Кулак перейти к ноутбукам
  • Создать новый блокнот
  • Рядом с пунктом «Выбор вычислений» нажмите «Создать» и выберите «Искровые вычисления AzureML».

  • Теперь вернитесь к блокноту, и вы должны увидеть созданные вычисления. Обычно это занимает 3-5 минут

  • Если вы хотите изменить сеанс, нажмите «Настроить сеанс» в левом нижнем углу блокнота.

  • по умолчанию, когда я создал, это была искра 3.2
  • Теперь давайте загрузим некоторые данные из внешнего файла
  • Ниже был набор доступных данных с открытым исходным кодом
df = spark.read.option("header", "true").csv("wasbs://[email protected]/Titanic.csv")
  • Выполним некоторую обработку данных
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

  • Теперь давайте загрузим из набора данных машинного обучения Azure.
import azureml.core
print(azureml.core.VERSION)
from azureml.core import Workspace, Dataset
ws = Workspace.get(name='workspacename', subscription_id='xxxxxxx', resource_group='rgname')
ds = Dataset.get_by_name(ws, "titanic")
df = ds.to_spark_dataframe()

Машинное обучение

  • Давайте теперь построим простую модель машинного обучения.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
  • Давайте посмотрим статистику для dataframe
df.describe().show()
  • Показать схему
df.printSchema()
  • Давайте займемся инженерией данных
df.select("Survived","Pclass","Embarked").show()
  • группировать и считать
df.groupBy("Sex","Survived").count().show()

  • Теперь заполните значения NA
df = df.na.fill({"Embarked" : 'S'})
df = df.drop("Cabin")
  • Создать новые значения столбцов
df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))
  • Индексировать значения
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in ["Sex","Embarked"]]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)
  • Удалить ненужные столбцы
df = df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex")
  • заполнить пустые значения
df = df.na.fill({"Age" : 10})
  • Изменить столбец на строку на двойную
#Using withColumn() examples
df = df.withColumn("Age",df.Age.cast('double'))
df = df.withColumn("SibSp",df.SibSp.cast('double'))
df = df.withColumn("Parch",df.Parch.cast('double'))
df = df.withColumn("Fare",df.Fare.cast('double'))
df = df.withColumn("Pclass",df.Pclass.cast('double'))
df = df.withColumn("Survived",df.Survived.cast('double'))
  • Характеризатор
feature = VectorAssembler(inputCols=df.columns[1:],outputCol="features")
feature_vector= feature.transform(df)
  • Разделить данные поезда и теста
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)
  • Логистическая регрессия
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
  • метрический расчет
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
  • Теперь дерево решений
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "Survived", "features").show()
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))
  • еще одна модель для Gradient Boosted Tree
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "Survived", "features").show()
  • Расчет метрики
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))

Оригинал статьи — Samples2022/managedspark1.md на главной · балакрешнан/Samples2022 (github.com)