Это сквозная демонстрационная демонстрация, показывающая, как искра, выделенные пулы SQL и машинное обучение
Вариант использования
- Инжиниринг данных и ETL с использованием выделенных пулов SQL
- Инжиниринг данных и ETL с использованием Synapse Spark
- Демонстрация того, как вызвать конвейер
- Показать, чтобы организовать от начала до конца
Шаги
- Сквозная архитектура
- Продемонстрируйте, как мы можем несоответствовать искре, SQL, машинному обучению в масштабе
- Объяснение от начала до конца
- Возобновить конвейер, чтобы запустить выделенный пул
- Использовать HTTP-активность
- Метод использования: POST
https://management.azure.com/subscriptions/subscriptionid/resourceGroups/resourcegroup/providers/Microsoft.Synapse/workspaces/workspacename/sqlPools/dedicatedpoolname/resume?api-version=2019-06-01-preview
- Теперь очистите совокупную таблицу, чтобы загрузить новую.
- Я использую хранимую процедуру для очистки данных
DROP PROCEDURE [dbo].[dropdailyaggr]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROC [dbo].[dropdailyaggr] AS
Drop Table [wwi].[dailyaggr]
GO
- Пример конвейера nycyellow ETL/инженерии данных с использованием pyspark
- Использование pyspark
from azureml.opendatasets import NycTlcYellow data = NycTlcYellow() data_df = data.to_spark_dataframe() # Display 10 rows display(data_df.limit(10))
from pyspark.sql.functions import * from pyspark.sql import *
df1 = data_df.withColumn("Date", (col("tpepPickupDateTime").cast("date"))) display(df1)
df2 = df1.withColumn("year", year(col("date"))) .withColumn("month", month(col("date"))) .withColumn("day", dayofmonth(col("date"))) .withColumn("hour", hour(col("date")))
dfgrouped = df2.groupBy("year","month").agg(sum("fareAmount").alias("Total"),count("vendorID").alias("Count")).sort(asc("year"), asc("month"))
dfgrouped.repartition(1).write.mode('overwrite').parquet("/dailyaggr/parquet/dailyaggr.parquet")
dfgrouped.repartition(1).write.mode('overwrite').option("header","true").csv("/dailyaggrcsv/csv/dailyaggr.csv")
df2.createOrReplaceTempView("nycyellow")
%%sql select * from nycyellow limit 100
%%sql select year(cast(tpepPickupDateTime as timestamp)) as tsYear, month(cast(tpepPickupDateTime as timestamp)) as tsmonth, day(cast(tpepPickupDateTime as timestamp)) as tsDay, hour(cast(tpepPickupDateTime as timestamp)) as tsHour, avg(totalAmount) as avgTotal, avg(fareAmount) as avgFare from nycyellow group by tsYear, tsmonth,tsDay, tsHour order by tsYear, tsmonth,tsDay, tsHour
%%sql DROP TABLE dailyaggr
%%sql CREATE TABLE dailyaggr COMMENT 'This table is created with existing data' AS select year(cast(tpepPickupDateTime as timestamp)) as tsYear, month(cast(tpepPickupDateTime as timestamp)) as tsmonth, day(cast(tpepPickupDateTime as timestamp)) as tsDay, hour(cast(tpepPickupDateTime as timestamp)) as tsHour, avg(totalAmount) as avgTotal, avg(fareAmount) as avgFare from nycyellow group by tsYear, tsmonth,tsDay, tsHour order by tsYear, tsmonth,tsDay, tsHour
dailyaggr = spark.sql("SELECT tsYear, tsMonth, tsDay, tsHour, avgTotal FROM dailyaggr") display(dailyaggr)
%%spark import com.microsoft.spark.sqlanalytics.utils.Constants import org.apache.spark.sql.SqlAnalyticsConnector._
from pyspark.ml.regression import LinearRegression
%%pyspark import pyspark print(print(pyspark.__version__))
%%spark import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors val dailyaggr = spark.sql("SELECT tsYear, tsMonth, tsDay, tsHour, avgTotal FROM dailyaggr") val featureCols=Array("tsYear","tsMonth","tsDay","tsHour") val assembler: org.apache.spark.ml.feature.VectorAssembler= new VectorAssembler().setInputCols(featureCols).setOutputCol("features") val assembledDF = assembler.setHandleInvalid("skip").transform(dailyaggr) val assembledFinalDF = assembledDF.select("avgTotal","features")
%%spark import com.microsoft.spark.sqlanalytics.utils.Constants import org.apache.spark.sql.SqlAnalyticsConnector._
%%spark dailyaggr.repartition(2).write.synapsesql("accsynapsepools.wwi.dailyaggr", Constants.INTERNAL)
%%spark import org.apache.spark.ml.feature.Normalizer val normalizedDF = new Normalizer().setInputCol("features").setOutputCol("normalizedFeatures").transform(assembledFinalDF)
%%spark val normalizedDF1 = normalizedDF.na.drop()
%%spark val Array(trainingDS, testDS) = normalizedDF1.randomSplit(Array(0.7, 0.3))
%%spark import org.apache.spark.ml.regression.LinearRegression // Create a LinearRegression instance. This instance is an Estimator. val lr = new LinearRegression().setLabelCol("avgTotal").setMaxIter(100) // Print out the parameters, documentation, and any default values. println(s"Linear Regression parameters:\n ${lr.explainParams()}\n") // Learn a Linear Regression model. This uses the parameters stored in lr. val lrModel = lr.fit(trainingDS) // Make predictions on test data using the Transformer.transform() method. // LinearRegression.transform will only use the 'features' column. val lrPredictions = lrModel.transform(testDS)
%%spark import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ println("\nPredictions : " ) lrPredictions.select($"avgTotal".cast(IntegerType),$"prediction".cast(IntegerType)).orderBy(abs($"prediction"-$"avgTotal")).distinct.show(15)
%%spark import org.apache.spark.ml.evaluation.RegressionEvaluator val evaluator_r2 = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("avgTotal").setMetricName("r2") //As the name implies, isLargerBetter returns if a larger value is better or smaller for evaluation. val isLargerBetter : Boolean = evaluator_r2.isLargerBetter println("Coefficient of determination = " + evaluator_r2.evaluate(lrPredictions))
%%spark //Evaluate the results. Calculate Root Mean Square Error val evaluator_rmse = new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("avgTotal").setMetricName("rmse") //As the name implies, isLargerBetter returns if a larger value is better for evaluation. val isLargerBetter1 : Boolean = evaluator_rmse.isLargerBetter println("Root Mean Square Error = " + evaluator_rmse.evaluate(lrPredictions))
%%spark val dailyaggrdf = spark.read.synapsesql("accsynapsepools.wwi.dailyaggr")
%%spark display(dailyaggrdf)
%%spark dailyaggrdf.count()
- нью-йоркские каникулы в scala code
// Load nyc green taxi trip records from azure open dataset val blob_account_name = "azureopendatastorage" val nyc_blob_container_name = "nyctlc" val nyc_blob_relative_path = "green" val nyc_blob_sas_token = "" val nyc_wasbs_path = f"wasbs://$nyc_blob_container_name@$blob_account_name.blob.core.windows.net/$nyc_blob_relative_path" spark.conf.set(f"fs.azure.sas.$nyc_blob_container_name.$blob_account_name.blob.core.windows.net",nyc_blob_sas_token) val nyc_tlc = spark.read.parquet(nyc_wasbs_path)
// Filter data by time range import java.sql.Timestamp import org.joda.time.DateTime val end_date = new Timestamp(DateTime.parse("2018-06-06").getMillis) val start_date = new Timestamp(DateTime.parse("2018-05-01").getMillis) val nyc_tlc_df = nyc_tlc.filter((nyc_tlc("lpepPickupDatetime") >= start_date) && (nyc_tlc("lpepPickupDatetime") <= end_date)) nyc_tlc_df.show(5, truncate = false)
// Extract month, day of month, and day of week from pickup datetime and add a static column for the country code to join holiday data. import org.apache.spark.sql.functions._ val nyc_tlc_df_expand = ( nyc_tlc_df.withColumn("datetime", to_date(col("lpepPickupDatetime"))) .withColumn("month_num",month(col("lpepPickupDatetime"))) .withColumn("day_of_month",dayofmonth(col("lpepPickupDatetime"))) .withColumn("day_of_week",dayofweek(col("lpepPickupDatetime"))) .withColumn("hour_of_day",hour(col("lpepPickupDatetime"))) .withColumn("country_code",lit("US")) )
// Remove unused columns from nyc green taxi data val nyc_tlc_df_clean = nyc_tlc_df_expand.drop( "lpepDropoffDatetime", "puLocationId", "doLocationId", "pickupLongitude", "pickupLatitude", "dropoffLongitude","dropoffLatitude" ,"rateCodeID", "storeAndFwdFlag","paymentType", "fareAmount", "extra", "mtaTax", "improvementSurcharge", "tollsAmount", "ehailFee", "tripType" )
// Display 5 rows nyc_tlc_df_clean.show(5, truncate = false)
// Load public holidays data from azure open dataset val hol_blob_container_name = "holidaydatacontainer" val hol_blob_relative_path = "Processed" val hol_blob_sas_token = "" val hol_wasbs_path = f"wasbs://$hol_blob_container_name@$blob_account_name.blob.core.windows.net/$hol_blob_relative_path" spark.conf.set(f"fs.azure.sas.$hol_blob_container_name.$blob_account_name.blob.core.windows.net",hol_blob_sas_token) val hol_raw = spark.read.parquet(hol_wasbs_path) // Filter data by time range val hol_df = hol_raw.filter((hol_raw("date") >= start_date) && (hol_raw("date") <= end_date)) // Display 5 rows // hol_df.show(5, truncate = false)
val hol_df_clean = ( hol_df.withColumnRenamed("countryRegionCode","country_code") .withColumn("datetime",to_date(col("date"))) ) hol_df_clean.show(5, truncate = false)
// enrich taxi data with holiday data val nyc_taxi_holiday_df = nyc_tlc_df_clean.join(hol_df_clean, Seq("datetime", "country_code") , "left") nyc_taxi_holiday_df.show(5,truncate = false)
// Create a temp table and filter out non empty holiday rows nyc_taxi_holiday_df.createOrReplaceTempView("nyc_taxi_holiday_df") val result = spark.sql("SELECT * from nyc_taxi_holiday_df WHERE holidayName is NOT NULL ") result.show(5, truncate = false)
// Load weather data from azure open dataset val weather_blob_container_name = "isdweatherdatacontainer" val weather_blob_relative_path = "ISDWeather/" val weather_blob_sas_token = "" val weather_wasbs_path = f"wasbs://$weather_blob_container_name@$blob_account_name.blob.core.windows.net/$weather_blob_relative_path" spark.conf.set(f"fs.azure.sas.$weather_blob_container_name.$blob_account_name.blob.core.windows.net",hol_blob_sas_token) val isd = spark.read.parquet(weather_wasbs_path) // Display 5 rows // isd.show(5, truncate = false)
// Filter data by time range val isd_df = isd.filter((isd("datetime") >= start_date) && (isd("datetime") <= end_date)) // Display 5 rows isd_df.show(5, truncate = false)
al weather_df = ( isd_df.filter(isd_df("latitude") >= "40.53") .filter(isd_df("latitude") <= "40.88") .filter(isd_df("longitude") >= "-74.09") .filter(isd_df("longitude") <= "-73.72") .filter(isd_df("temperature").isNotNull) .withColumnRenamed("datetime","datetime_full") )
// Remove unused columns val weather_df_clean = weather_df.drop("usaf", "wban", "longitude", "latitude").withColumn("datetime", to_date(col("datetime_full"))) //weather_df_clean.show(5, truncate = false)
val weather_df_grouped = ( weather_df_clean.groupBy('datetime). agg( mean('snowDepth) as "avg_snowDepth", max('precipTime) as "max_precipTime", mean('temperature) as "avg_temperature", max('precipDepth) as "max_precipDepth" ) ) weather_df_grouped.show(5, truncate = false)
// Enrich taxi data with weather val nyc_taxi_holiday_weather_df = nyc_taxi_holiday_df.join(weather_df_grouped, Seq("datetime") ,"left") nyc_taxi_holiday_weather_df.cache()
nyc_taxi_holiday_weather_df.show(5,truncate = false)
// Run the describe() function on the new dataframe to see summary statistics for each field. display(nyc_taxi_holiday_weather_df.describe())
nyc_taxi_holiday_weather_df.count
// Remove invalid rows with less than 0 taxi fare or tip val final_df = ( nyc_taxi_holiday_weather_df. filter(nyc_taxi_holiday_weather_df("tipAmount") > 0). filter(nyc_taxi_holiday_weather_df("totalAmount") > 0) )
spark.sql("DROP TABLE IF EXISTS NYCTaxi.nyc_taxi_holiday_weather");
spark.sql("DROP DATABASE IF EXISTS NYCTaxi"); spark.sql("CREATE DATABASE NYCTaxi"); spark.sql("USE NYCTaxi");
final_df.write.saveAsTable("nyc_taxi_holiday_weather"); val final_results = spark.sql("SELECT COUNT(*) FROM nyc_taxi_holiday_weather"); final_results.show(5, truncate = false)
- Приостановить конвейер, чтобы приостановить выделенный пул
- Использовать HTTP-активность
- Метод использования: POST
https://management.azure.com/subscriptions/subscriptionid/resourceGroups/resourcegroup/providers/Microsoft.Synapse/workspaces/workdpspacename/sqlPools/dedicatedpoolname/pause?api-version=2019-06-01-preview
// Load nyc green taxi trip records from azure open dataset val blob_account_name = "azureopendatastorage" val nyc_blob_container_name = "nyctlc" val nyc_blob_relative_path = "green" val nyc_blob_sas_token = "" val nyc_wasbs_path = f"wasbs://$nyc_blob_container_name@$blob_account_name.blob.core.windows.net/$nyc_blob_relative_path" spark.conf.set(f"fs.azure.sas.$nyc_blob_container_name.$blob_account_name.blob.core.windows.net",nyc_blob_sas_token) val nyc_tlc = spark.read.parquet(nyc_wasbs_path)
// Filter data by time range import java.sql.Timestamp import org.joda.time.DateTime val end_date = new Timestamp(DateTime.parse("2018-06-06").getMillis) val start_date = new Timestamp(DateTime.parse("2018-05-01").getMillis) val nyc_tlc_df = nyc_tlc.filter((nyc_tlc("lpepPickupDatetime") >= start_date) && (nyc_tlc("lpepPickupDatetime") <= end_date)) nyc_tlc_df.show(5, truncate = false)
// Extract month, day of month, and day of week from pickup datetime and add a static column for the country code to join holiday data. import org.apache.spark.sql.functions._ val nyc_tlc_df_expand = ( nyc_tlc_df.withColumn("datetime", to_date(col("lpepPickupDatetime"))) .withColumn("month_num",month(col("lpepPickupDatetime"))) .withColumn("day_of_month",dayofmonth(col("lpepPickupDatetime"))) .withColumn("day_of_week",dayofweek(col("lpepPickupDatetime"))) .withColumn("hour_of_day",hour(col("lpepPickupDatetime"))) .withColumn("country_code",lit("US")) )
// Remove unused columns from nyc green taxi data val nyc_tlc_df_clean = nyc_tlc_df_expand.drop( "lpepDropoffDatetime", "puLocationId", "doLocationId", "pickupLongitude", "pickupLatitude", "dropoffLongitude","dropoffLatitude" ,"rateCodeID", "storeAndFwdFlag","paymentType", "fareAmount", "extra", "mtaTax", "improvementSurcharge", "tollsAmount", "ehailFee", "tripType" )
// Display 5 rows nyc_tlc_df_clean.show(5, truncate = false)
// Load public holidays data from azure open dataset val hol_blob_container_name = "holidaydatacontainer" val hol_blob_relative_path = "Processed" val hol_blob_sas_token = "" val hol_wasbs_path = f"wasbs://$hol_blob_container_name@$blob_account_name.blob.core.windows.net/$hol_blob_relative_path" spark.conf.set(f"fs.azure.sas.$hol_blob_container_name.$blob_account_name.blob.core.windows.net",hol_blob_sas_token) val hol_raw = spark.read.parquet(hol_wasbs_path) // Filter data by time range val hol_df = hol_raw.filter((hol_raw("date") >= start_date) && (hol_raw("date") <= end_date)) // Display 5 rows // hol_df.show(5, truncate = false)
val hol_df_clean = ( hol_df.withColumnRenamed("countryRegionCode","country_code") .withColumn("datetime",to_date(col("date"))) ) hol_df_clean.show(5, truncate = false)
// enrich taxi data with holiday data val nyc_taxi_holiday_df = nyc_tlc_df_clean.join(hol_df_clean, Seq("datetime", "country_code") , "left") nyc_taxi_holiday_df.show(5,truncate = false)
// Create a temp table and filter out non empty holiday rows nyc_taxi_holiday_df.createOrReplaceTempView("nyc_taxi_holiday_df") val result = spark.sql("SELECT * from nyc_taxi_holiday_df WHERE holidayName is NOT NULL ") result.show(5, truncate = false)
// Load weather data from azure open dataset val weather_blob_container_name = "isdweatherdatacontainer" val weather_blob_relative_path = "ISDWeather/" val weather_blob_sas_token = "" val weather_wasbs_path = f"wasbs://$weather_blob_container_name@$blob_account_name.blob.core.windows.net/$weather_blob_relative_path" spark.conf.set(f"fs.azure.sas.$weather_blob_container_name.$blob_account_name.blob.core.windows.net",hol_blob_sas_token) val isd = spark.read.parquet(weather_wasbs_path) // Display 5 rows // isd.show(5, truncate = false)
// Filter data by time range val isd_df = isd.filter((isd("datetime") >= start_date) && (isd("datetime") <= end_date)) // Display 5 rows isd_df.show(5, truncate = false)
al weather_df = ( isd_df.filter(isd_df("latitude") >= "40.53") .filter(isd_df("latitude") <= "40.88") .filter(isd_df("longitude") >= "-74.09") .filter(isd_df("longitude") <= "-73.72") .filter(isd_df("temperature").isNotNull) .withColumnRenamed("datetime","datetime_full") )
// Remove unused columns val weather_df_clean = weather_df.drop("usaf", "wban", "longitude", "latitude").withColumn("datetime", to_date(col("datetime_full"))) //weather_df_clean.show(5, truncate = false)
val weather_df_grouped = ( weather_df_clean.groupBy('datetime). agg( mean('snowDepth) as "avg_snowDepth", max('precipTime) as "max_precipTime", mean('temperature) as "avg_temperature", max('precipDepth) as "max_precipDepth" ) ) weather_df_grouped.show(5, truncate = false)
// Enrich taxi data with weather val nyc_taxi_holiday_weather_df = nyc_taxi_holiday_df.join(weather_df_grouped, Seq("datetime") ,"left") nyc_taxi_holiday_weather_df.cache()
nyc_taxi_holiday_weather_df.show(5,truncate = false)
// Run the describe() function on the new dataframe to see summary statistics for each field. display(nyc_taxi_holiday_weather_df.describe())
nyc_taxi_holiday_weather_df.count
// Remove invalid rows with less than 0 taxi fare or tip val final_df = ( nyc_taxi_holiday_weather_df. filter(nyc_taxi_holiday_weather_df("tipAmount") > 0). filter(nyc_taxi_holiday_weather_df("totalAmount") > 0) )
spark.sql("DROP TABLE IF EXISTS NYCTaxi.nyc_taxi_holiday_weather");
spark.sql("DROP DATABASE IF EXISTS NYCTaxi"); spark.sql("CREATE DATABASE NYCTaxi"); spark.sql("USE NYCTaxi");
final_df.write.saveAsTable("nyc_taxi_holiday_weather"); val final_results = spark.sql("SELECT COUNT(*) FROM nyc_taxi_holiday_weather"); final_results.show(5, truncate = false)
// Load nyc green taxi trip records from azure open dataset val blob_account_name = "azureopendatastorage" val nyc_blob_container_name = "nyctlc" val nyc_blob_relative_path = "green" val nyc_blob_sas_token = "" val nyc_wasbs_path = f"wasbs://$nyc_blob_container_name@$blob_account_name.blob.core.windows.net/$nyc_blob_relative_path" spark.conf.set(f"fs.azure.sas.$nyc_blob_container_name.$blob_account_name.blob.core.windows.net",nyc_blob_sas_token) val nyc_tlc = spark.read.parquet(nyc_wasbs_path)
// Filter data by time range import java.sql.Timestamp import org.joda.time.DateTime val end_date = new Timestamp(DateTime.parse("2018-06-06").getMillis) val start_date = new Timestamp(DateTime.parse("2018-05-01").getMillis) val nyc_tlc_df = nyc_tlc.filter((nyc_tlc("lpepPickupDatetime") >= start_date) && (nyc_tlc("lpepPickupDatetime") <= end_date)) nyc_tlc_df.show(5, truncate = false)
// Extract month, day of month, and day of week from pickup datetime and add a static column for the country code to join holiday data. import org.apache.spark.sql.functions._ val nyc_tlc_df_expand = ( nyc_tlc_df.withColumn("datetime", to_date(col("lpepPickupDatetime"))) .withColumn("month_num",month(col("lpepPickupDatetime"))) .withColumn("day_of_month",dayofmonth(col("lpepPickupDatetime"))) .withColumn("day_of_week",dayofweek(col("lpepPickupDatetime"))) .withColumn("hour_of_day",hour(col("lpepPickupDatetime"))) .withColumn("country_code",lit("US")) )
// Remove unused columns from nyc green taxi data val nyc_tlc_df_clean = nyc_tlc_df_expand.drop( "lpepDropoffDatetime", "puLocationId", "doLocationId", "pickupLongitude", "pickupLatitude", "dropoffLongitude","dropoffLatitude" ,"rateCodeID", "storeAndFwdFlag","paymentType", "fareAmount", "extra", "mtaTax", "improvementSurcharge", "tollsAmount", "ehailFee", "tripType" )
// Display 5 rows nyc_tlc_df_clean.show(5, truncate = false)
// Load public holidays data from azure open dataset val hol_blob_container_name = "holidaydatacontainer" val hol_blob_relative_path = "Processed" val hol_blob_sas_token = "" val hol_wasbs_path = f"wasbs://$hol_blob_container_name@$blob_account_name.blob.core.windows.net/$hol_blob_relative_path" spark.conf.set(f"fs.azure.sas.$hol_blob_container_name.$blob_account_name.blob.core.windows.net",hol_blob_sas_token) val hol_raw = spark.read.parquet(hol_wasbs_path) // Filter data by time range val hol_df = hol_raw.filter((hol_raw("date") >= start_date) && (hol_raw("date") <= end_date)) // Display 5 rows // hol_df.show(5, truncate = false)
val hol_df_clean = ( hol_df.withColumnRenamed("countryRegionCode","country_code") .withColumn("datetime",to_date(col("date"))) ) hol_df_clean.show(5, truncate = false)
// enrich taxi data with holiday data val nyc_taxi_holiday_df = nyc_tlc_df_clean.join(hol_df_clean, Seq("datetime", "country_code") , "left") nyc_taxi_holiday_df.show(5,truncate = false)
// Create a temp table and filter out non empty holiday rows nyc_taxi_holiday_df.createOrReplaceTempView("nyc_taxi_holiday_df") val result = spark.sql("SELECT * from nyc_taxi_holiday_df WHERE holidayName is NOT NULL ") result.show(5, truncate = false)
// Load weather data from azure open dataset val weather_blob_container_name = "isdweatherdatacontainer" val weather_blob_relative_path = "ISDWeather/" val weather_blob_sas_token = "" val weather_wasbs_path = f"wasbs://$weather_blob_container_name@$blob_account_name.blob.core.windows.net/$weather_blob_relative_path" spark.conf.set(f"fs.azure.sas.$weather_blob_container_name.$blob_account_name.blob.core.windows.net",hol_blob_sas_token) val isd = spark.read.parquet(weather_wasbs_path) // Display 5 rows // isd.show(5, truncate = false)
// Filter data by time range val isd_df = isd.filter((isd("datetime") >= start_date) && (isd("datetime") <= end_date)) // Display 5 rows isd_df.show(5, truncate = false)
al weather_df = ( isd_df.filter(isd_df("latitude") >= "40.53") .filter(isd_df("latitude") <= "40.88") .filter(isd_df("longitude") >= "-74.09") .filter(isd_df("longitude") <= "-73.72") .filter(isd_df("temperature").isNotNull) .withColumnRenamed("datetime","datetime_full") )
// Remove unused columns val weather_df_clean = weather_df.drop("usaf", "wban", "longitude", "latitude").withColumn("datetime", to_date(col("datetime_full"))) //weather_df_clean.show(5, truncate = false)
val weather_df_grouped = ( weather_df_clean.groupBy('datetime). agg( mean('snowDepth) as "avg_snowDepth", max('precipTime) as "max_precipTime", mean('temperature) as "avg_temperature", max('precipDepth) as "max_precipDepth" ) ) weather_df_grouped.show(5, truncate = false)
// Enrich taxi data with weather val nyc_taxi_holiday_weather_df = nyc_taxi_holiday_df.join(weather_df_grouped, Seq("datetime") ,"left") nyc_taxi_holiday_weather_df.cache()
nyc_taxi_holiday_weather_df.show(5,truncate = false)
// Run the describe() function on the new dataframe to see summary statistics for each field. display(nyc_taxi_holiday_weather_df.describe())
nyc_taxi_holiday_weather_df.count
// Remove invalid rows with less than 0 taxi fare or tip val final_df = ( nyc_taxi_holiday_weather_df. filter(nyc_taxi_holiday_weather_df("tipAmount") > 0). filter(nyc_taxi_holiday_weather_df("totalAmount") > 0) )
spark.sql("DROP TABLE IF EXISTS NYCTaxi.nyc_taxi_holiday_weather");
spark.sql("DROP DATABASE IF EXISTS NYCTaxi"); spark.sql("CREATE DATABASE NYCTaxi"); spark.sql("USE NYCTaxi");
final_df.write.saveAsTable("nyc_taxi_holiday_weather"); val final_results = spark.sql("SELECT COUNT(*) FROM nyc_taxi_holiday_weather"); final_results.show(5, truncate = false)
- Машинное обучение Azure Spark
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) filtered_df = nyc_tlc.to_spark_dataframe()
# To make development easier, faster and less expensive down sample for now sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
#sampled_taxi_df.show(5) display(sampled_taxi_df)
sampled_taxi_df.createOrReplaceTempView("nytaxi")
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\ , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\ , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\ , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\ , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\ , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped') )\ .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\ & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\ & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\ & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\ & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\ & (sampled_taxi_df.rateCodeId <= 5) & (sampled_taxi_df.paymentType.isin({"1", "2"})) )
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\ , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\ , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\ .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\ .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\ .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\ .otherwise(0).alias('trafficTimeBins') )\ .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
# Since the sample uses an algorithm that only works with numeric features, convert them so they can be consumed sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex") en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec") sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex") en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec") # Create a new dataframe that has had the encodings applied encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
#Decide on the split between training and testing data from the dataframe trainingFraction = 0.7 testingFraction = (1-trainingFraction) seed = 1234 # Split the dataframe into test and training dataframes train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
## Create a new LR object for the model logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped') ## The formula for the model classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec") ## Undertake training and create an LR model lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df) ## Saving the model is optional but its another form of inter session cache datestamp = datetime.now().strftime('%m-%d-%Y-%s') fileName = "lrModel_" + datestamp logRegDirfilename = fileName lrModel.save(logRegDirfilename) ## Predict tip 1/0 (yes/no) on the test dataset, evaluation using AUROC predictions = lrModel.transform(test_data_df) predictionAndLabels = predictions.select("label","prediction").rdd metrics = BinaryClassificationMetrics(predictionAndLabels) print("Area under ROC = %s" % metrics.areaUnderROC)
## Plot the ROC curve, no need for pandas as this uses the modelSummary object modelSummary = lrModel.stages[-1].summary plt.plot([0, 1], [0, 1], 'r--') plt.plot(modelSummary.roc.select('FPR').collect(), modelSummary.roc.select('TPR').collect()) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.show()
- Моделирование Spark MLLib
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) filtered_df = nyc_tlc.to_spark_dataframe()
# To make development easier, faster and less expensive down sample for now sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
#sampled_taxi_df.show(5) display(sampled_taxi_df)
sampled_taxi_df.createOrReplaceTempView("nytaxi")
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\ , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\ , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\ , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\ , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\ , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped') )\ .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\ & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\ & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\ & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\ & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\ & (sampled_taxi_df.rateCodeId <= 5) & (sampled_taxi_df.paymentType.isin({"1", "2"})) )
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\ , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\ , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\ .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\ .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\ .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\ .otherwise(0).alias('trafficTimeBins') )\ .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
# Since the sample uses an algorithm that only works with numeric features, convert them so they can be consumed sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex") en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec") sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex") en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec") # Create a new dataframe that has had the encodings applied encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
#Decide on the split between training and testing data from the dataframe trainingFraction = 0.7 testingFraction = (1-trainingFraction) seed = 1234 # Split the dataframe into test and training dataframes train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
## Create a new LR object for the model logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped') ## The formula for the model classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec") ## Undertake training and create an LR model lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df) ## Saving the model is optional but its another form of inter session cache datestamp = datetime.now().strftime('%m-%d-%Y-%s') fileName = "lrModel_" + datestamp logRegDirfilename = fileName lrModel.save(logRegDirfilename) ## Predict tip 1/0 (yes/no) on the test dataset, evaluation using AUROC predictions = lrModel.transform(test_data_df) predictionAndLabels = predictions.select("label","prediction").rdd metrics = BinaryClassificationMetrics(predictionAndLabels) print("Area under ROC = %s" % metrics.areaUnderROC)
## Plot the ROC curve, no need for pandas as this uses the modelSummary object modelSummary = lrModel.stages[-1].summary plt.plot([0, 1], [0, 1], 'r--') plt.plot(modelSummary.roc.select('FPR').collect(), modelSummary.roc.select('TPR').collect()) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.show()
Первоначально опубликовано на https://github.com.