В этой статье мы рассмотрим библиотеку Python pyspark-easy и то, как ее можно использовать при разработке Pyspark.
Введение
Spark — это унифицированный аналитический механизм для крупномасштабной обработки данных. Он предоставляет высокоуровневые API-интерфейсы на Scala, Java, Python и R, а также оптимизированный движок, поддерживающий общие графы вычислений для анализа данных. Он также поддерживает богатый набор инструментов более высокого уровня, включая Spark SQL для SQL и DataFrames, MLlib для машинного обучения, GraphX для обработки графов и Structured Streaming для потоковой обработки.
PySpark
PySpark — это интерфейс для Apache Spark в Python. Он не только позволяет писать приложения Spark с использованием API-интерфейсов Python, но также предоставляет оболочку PySpark для интерактивного анализа ваших данных в распределенной среде.
pyspark-легко
pyspark-easy — это библиотека Python, позволяющая быстро получить представление о Spark DataFrame. Он также включен в функции и методы, призванные помочь специалистам по данным в разработке их моделей, устраняя необходимость выполнения повторяющихся задач.
Простая установка pyspark
pip install pyspark-easy
Скрыть ход выполнения консоли Spark
Скройте ход выполнения консоли искры, чтобы сведения о ходе выполнения консоли не нарушали выходные данные.
spark.ui.showConsoleProgress
= Ложь
Это можно добавить в сеанс искры следующим образом.
spark = SparkSession
.builder
.master("local[*]")
.appName("myApp")
.config("spark.ui.showConsoleProgress", "false")
И. Краткая информация о кадрах данных
Исследуйте свой фрейм данных Spark с помощью всего двух строк кода, используя «pyspark-easy», который дает следующую статистику:
- Количество наблюдений/строк.
- Число столбцов.
- Количество и процент повторяющихся строк, если таковые имеются.
- Количество столбцов по типу данных и список этих столбцов.
- Сводная статистика по числовым столбцам - количество, среднее значение, стандартное отклонение, минимум, максимум, 25%, 50% и 75% процентиль. Это необязательный аргумент. Отображает только 5 столбцов статистики в строке, чтобы пользователю было проще просматривать результаты.
- Количество столбцов с пропущенными значениями.
- Список столбцов с отсутствующими значениями вместе с нет. отсутствующих строк и процент отсутствующих строк.
Набор данных можно скачать с Kaggle. Однако для этой статьи набор данных изменен.
Давайте посмотрим, как pyspark-easy творит чудеса:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-easy').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
#import the library and pass the dataframe
from pyspark_easy import pyspark_easy
py=pyspark_easy(dataframe)
py.summary()
Результаты:
-----------------------------------+----------+
| The number of observations/rows : | 22312689 |
+-----------------------------------+----------+
| the number of variables/columns : | 10 |
+-----------------------------------+----------+
| No of Duplicate rows : | 0 |
+-----------------------------------+----------+
| % of Duplicate rows : | 0 |
+-----------------------------------+----------+
The number of string columns are : 6
The string columns are :
+-------------------+--------+
| Columns | Types |
+===================+========+
| business_dt | string |
+-------------------+--------+
| branchid | string |
+-------------------+--------+
| customerno | string |
+-------------------+--------+
| phone | string |
+-------------------+--------+
| customername | string |
+-------------------+--------+
| customerid | string |
+-------------------+--------+
The number of numerical columns are : 3
The numerical columns are :
+--------------------+--------+
| Columns | Types |
+====================+========+
| id | bigint |
+--------------------+--------+
| amount | bigint |
+--------------------+--------+
| average_bal | int |
+--------------------+--------+
The number of date/time columns are : 1
The date/time columns are :
+-----------+-----------+
| Columns | Types |
+===========+===========+
| updatedon | timestamp |
+-----------+-----------+
Number of columns with missing values are : 3
The columns with missing values are :
+-------------------+-----------------------+---------------------+
| Columns | No. of missing values | % of missing values |
+===================+=======================+=====================+
| customername | 64029 | 0.290 |
+-------------------+-----------------------+---------------------+
| customerid | 11 | 0 |
+-------------------+-----------------------+---------------------+
| id | 11 | 0 |
+-------------------+-----------------------+---------------------+
По умолчанию сводная статистика отключена. Включение «summary_stats=’Y’» приводит к приведенной ниже статистике вместе с приведенными выше результатами.
py.summary(
summary_stats='Y')
+-------+------------------+------------------+------------------+ |summary| age| balance| day| +-------+------------------+------------------+------------------+ | count| 11162| 11162| 11162| | mean|41.231947679627304|1528.5385235620856|15.658036194230425| | stddev|11.913369192215518| 3225.413325946149| 8.420739541006462| | min| 18| -6847| 1| | 25%| 32| 122| 8| | 50%| 39| 550| 15| | 75%| 49| 1708| 22| | max| 95| 81204| 31| +-------+------------------+------------------+------------------+
II. Оценка модели
Оцените модель всего двумя строками кода. Приведенная ниже функция «pyspark_model_eval» генерирует почти все необходимые показатели и графики. Он поддерживает бинарные и многоклассовые модели классификации.
from pyspark_easy import pyspark_model_eval
model_res=pyspark_model_eval(model,predicted_df)
model_res.results()
Параметры:
model — обученная модель
predicted_df — кадр данных, содержащий метку и прогнозы.
Мы можем обучить модель бинарной классификации с помощью «bank.csv» и оценить результаты с помощью pyspark-easy.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('ml-bank').getOrCreate() df = spark.read.csv('bank.csv', header = True, inferSchema = True) # Preprocessing from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssemblercategoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome'] stages = []for categoricalCol in categoricalColumns: stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index') encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder]label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label') stages += [label_stringIdx]numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous'] assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") stages += [assembler] # Build pipelines from pyspark.ml import Pipeline pipeline = Pipeline(stages = stages) pipelineModel = pipeline.fit(df) df = pipelineModel.transform(df) selectedCols = ['label', 'features'] + cols df = df.select(selectedCols) # Model train, test = df.randomSplit([0.8, 0.2], seed = 2021) lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10) model = lr.fit(train) predictions = model.transform(test)
Теперь мы передаем модель и DataFrame, содержащие прогнозы, в «pyspark_model_eval».
from pyspark_easy import pyspark_model_eval model_res=pyspark_model_eval(
model,
predictions) model_res.results()
Результаты:
The Train vs Test evaluation Metrics are : +--------------------+--------+--------+ | Metrics | Train | Test | +====================+========+========+ | AreaUnderROC | 88.430 | 79.980 | +--------------------+--------+--------+ | Accuracy | 80.070 | 80.110 | +--------------------+--------+--------+ | Weighted Precision | 80.110 | 80.630 | +--------------------+--------+--------+ | Weighted Recall | 80.070 | 80.110 | +--------------------+--------+--------+ | Weighted F1 Score | 80.010 | 79.990 | +--------------------+--------+--------+ Note: The Weighted metrics - calculated by label then it is averaged
The binary classification metrics for test data are: +-----------+-----------+ | Metrics | Test Data | +===========+===========+ | Precision | 84.560 | +-----------+-----------+ | Recall | 72.790 | +-----------+-----------+ | F1 Score | 78.240 | +-----------+-----------+ The classification report follows :
III. Поиск имени столбца
Поиск столбца с указанной подстрокой в схеме, списке схем или во всей базе данных (Hive/Impala). Для этого не требуется доступ к метахранилищу.
from pyspark_easy import column_search
column_search(spark_session,substring, *schemas)
Параметры:
spark_session — объект искрового сеанса
substring — найти все столбцы, содержащие эту строку . Это оператор LIKE.
*схемы — необязательный аргумент. Если оставить пустым, выполняется поиск по всей базе данных. Для поиска по схемам передайте имена схем в список. ['схема1','схема2']. Это оператор LIKE.
В строке ниже выполняется поиск по всей базе данных: -
column_search(spark,'avg')
Результаты:
Столбцы, найденные в схеме «schema1» в таблице «table1»: {‘account_bal_avg’}
Столбцы, найденные в схеме «schema2» в таблице «table2»: {‘customer_avg_balance, last_month_avg_deposit’}
IV. Генератор дат
Наиболее распространенным требованием в любом проекте разработки Spark является разработка функций. Разработка функций — это навык, которым должен обладать каждый специалист по данным, особенно в случае функций окна даты. pyspark-easy помогает создавать функции, связанные с датой скользящего окна, для учета прошлых значений в нескольких строках кода.
Чтобы сгенерировать даты временных рядов по месяцам:
from pyspark_easy import dates_generator
dates=dates_generator(date,column, backward,*forward)
Параметры:
Дата — строка даты.
Столбец — строка, имя столбца.
Назад — это третий аргумент. Количество месяцев, прошедших с Date. *Вперед — по умолчанию равно 0. Количество месяцев в будущем с Даты.
dates_generator('2020-03-01','customer_average_balance',12,0)
Результаты:
[['2021-02-01', '2021-02-28', 'customer_average_balance_b1'],
['2021-01-01', '2021-01-31', 'customer_average_balance_b2'],
['2020-12-01', '2020-12-31', 'customer_average_balance_b3'],
['2020-11-01', '2020-11-30', 'customer_average_balance_b4'],
['2020-10-01', '2020-10-31', 'customer_average_balance_b5'],
['2020-09-01', '2020-09-30', 'customer_average_balance_b6'],
['2020-08-01', '2020-08-31', 'customer_average_balance_b7'],
['2020-07-01', '2020-07-31', 'customer_average_balance_b8'],
['2020-06-01', '2020-06-30', 'customer_average_balance_b9'],
['2020-05-01', '2020-05-31', 'customer_average_balance_b10'],
['2020-04-01', '2020-04-30', 'customer_average_balance_b11'],
['2020-03-01', '2020-03-31', 'customer_average_balance_b12']]
Он создает вложенный список. В каждом подсписке есть три аргумента, первый из которых является начальной датой месяца, затем идет конечная дата месяца и имя столбца с суффиксом «_b1», где «b» в «_b1» указывает назад.
Спасибо за чтение! Я разрабатываю дополнительные утилиты, чтобы упростить жизнь специалистам по данным, и собираюсь написать больше статей в будущем. Это мой гитхаб: https://github.com/naresh-datanut/pyspark-easy. Жизнь более красочна, когда помогаешь другим.