Введение в управляемый Spark с помощью служб машинного обучения Azure
Предпосылки
- Учетная запись Azure
- Служба машинного обучения Azure
- Набор данных Титаник
Шаги управляемой искры
- Кулак перейти к ноутбукам
- Создать новый блокнот
- Рядом с пунктом «Выбор вычислений» нажмите «Создать» и выберите «Искровые вычисления AzureML».
- Теперь вернитесь к блокноту, и вы должны увидеть созданные вычисления. Обычно это занимает 3-5 минут
- Если вы хотите изменить сеанс, нажмите «Настроить сеанс» в левом нижнем углу блокнота.
- по умолчанию, когда я создал, это была искра 3.2
- Теперь давайте загрузим некоторые данные из внешнего файла
- Ниже был набор доступных данных с открытым исходным кодом
df = spark.read.option("header", "true").csv("wasbs://[email protected]/Titanic.csv")
- Выполним некоторую обработку данных
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
- Теперь давайте загрузим из набора данных машинного обучения Azure.
import azureml.core print(azureml.core.VERSION)
from azureml.core import Workspace, Dataset ws = Workspace.get(name='workspacename', subscription_id='xxxxxxx', resource_group='rgname') ds = Dataset.get_by_name(ws, "titanic") df = ds.to_spark_dataframe()
Машинное обучение
- Давайте теперь построим простую модель машинного обучения.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
- Давайте посмотрим статистику для dataframe
df.describe().show()
- Показать схему
df.printSchema()
- Давайте займемся инженерией данных
df.select("Survived","Pclass","Embarked").show()
- группировать и считать
df.groupBy("Sex","Survived").count().show()
- Теперь заполните значения NA
df = df.na.fill({"Embarked" : 'S'})
df = df.drop("Cabin")
- Создать новые значения столбцов
df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))
- Индексировать значения
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in ["Sex","Embarked"]]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)
- Удалить ненужные столбцы
df = df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex")
- заполнить пустые значения
df = df.na.fill({"Age" : 10})
- Изменить столбец на строку на двойную
#Using withColumn() examples
df = df.withColumn("Age",df.Age.cast('double'))
df = df.withColumn("SibSp",df.SibSp.cast('double'))
df = df.withColumn("Parch",df.Parch.cast('double'))
df = df.withColumn("Fare",df.Fare.cast('double'))
df = df.withColumn("Pclass",df.Pclass.cast('double'))
df = df.withColumn("Survived",df.Survived.cast('double'))
- Характеризатор
feature = VectorAssembler(inputCols=df.columns[1:],outputCol="features")
feature_vector= feature.transform(df)
- Разделить данные поезда и теста
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)
- Логистическая регрессия
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
- метрический расчет
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
- Теперь дерево решений
from pyspark.ml.classification import DecisionTreeClassifier dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features") dt_model = dt.fit(trainingData) dt_prediction = dt_model.transform(testData) dt_prediction.select("prediction", "Survived", "features").show()
dt_accuracy = evaluator.evaluate(dt_prediction) print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy)) print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))
- еще одна модель для Gradient Boosted Tree
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "Survived", "features").show()
- Расчет метрики
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))
Оригинал статьи — Samples2022/managedspark1.md на главной · балакрешнан/Samples2022 (github.com)