В этой статье мы рассмотрим библиотеку Python pyspark-easy и то, как ее можно использовать при разработке Pyspark.

Введение

Spark — это унифицированный аналитический механизм для крупномасштабной обработки данных. Он предоставляет высокоуровневые API-интерфейсы на Scala, Java, Python и R, а также оптимизированный движок, поддерживающий общие графы вычислений для анализа данных. Он также поддерживает богатый набор инструментов более высокого уровня, включая Spark SQL для SQL и DataFrames, MLlib для машинного обучения, GraphX ​​для обработки графов и Structured Streaming для потоковой обработки.

PySpark

PySpark — это интерфейс для Apache Spark в Python. Он не только позволяет писать приложения Spark с использованием API-интерфейсов Python, но также предоставляет оболочку PySpark для интерактивного анализа ваших данных в распределенной среде.

pyspark-легко

pyspark-easy — это библиотека Python, позволяющая быстро получить представление о Spark DataFrame. Он также включен в функции и методы, призванные помочь специалистам по данным в разработке их моделей, устраняя необходимость выполнения повторяющихся задач.

Простая установка pyspark

pip install pyspark-easy

Скрыть ход выполнения консоли Spark

Скройте ход выполнения консоли искры, чтобы сведения о ходе выполнения консоли не нарушали выходные данные.

spark.ui.showConsoleProgress = Ложь

Это можно добавить в сеанс искры следующим образом.

spark = SparkSession
      .builder
      .master("local[*]")
      .appName("myApp")
      .config("spark.ui.showConsoleProgress", "false")

И. Краткая информация о кадрах данных

Исследуйте свой фрейм данных Spark с помощью всего двух строк кода, используя «pyspark-easy», который дает следующую статистику:

  1. Количество наблюдений/строк.
  2. Число столбцов.
  3. Количество и процент повторяющихся строк, если таковые имеются.
  4. Количество столбцов по типу данных и список этих столбцов.
  5. Сводная статистика по числовым столбцам - количество, среднее значение, стандартное отклонение, минимум, максимум, 25%, 50% и 75% процентиль. Это необязательный аргумент. Отображает только 5 столбцов статистики в строке, чтобы пользователю было проще просматривать результаты.
  6. Количество столбцов с пропущенными значениями.
  7. Список столбцов с отсутствующими значениями вместе с нет. отсутствующих строк и процент отсутствующих строк.

Набор данных можно скачать с Kaggle. Однако для этой статьи набор данных изменен.

Давайте посмотрим, как pyspark-easy творит чудеса:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-easy').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
#import the library and pass the dataframe
from pyspark_easy import pyspark_easy
py=pyspark_easy(dataframe)
py.summary()

Результаты:

-----------------------------------+----------+
| The number of observations/rows : | 22312689 |
+-----------------------------------+----------+
| the number of variables/columns : | 10       |
+-----------------------------------+----------+
| No of Duplicate rows :            | 0        |
+-----------------------------------+----------+
| % of Duplicate rows :             | 0        |
+-----------------------------------+----------+

The number of string columns are : 6


The string columns are :
+-------------------+--------+
|      Columns      | Types  |
+===================+========+
| business_dt       | string |
+-------------------+--------+
| branchid          | string |
+-------------------+--------+
| customerno        | string |
+-------------------+--------+
| phone             | string |
+-------------------+--------+
| customername      | string |
+-------------------+--------+
| customerid        | string |
+-------------------+--------+



The number of numerical columns are : 3


The numerical columns are :
+--------------------+--------+
|      Columns       | Types  |
+====================+========+
| id                 | bigint |
+--------------------+--------+
| amount             | bigint |
+--------------------+--------+
| average_bal        | int    |
+--------------------+--------+



The number of date/time columns are : 1


The date/time columns are :
+-----------+-----------+
|  Columns  |   Types   |
+===========+===========+
| updatedon | timestamp |
+-----------+-----------+


Number of columns with missing values are : 3


The columns with missing values are :


+-------------------+-----------------------+---------------------+
|      Columns      | No. of missing values | % of missing values |
+===================+=======================+=====================+
| customername      | 64029                 | 0.290               |
+-------------------+-----------------------+---------------------+
| customerid        | 11                    | 0                   |
+-------------------+-----------------------+---------------------+
| id                | 11                    | 0                   |
+-------------------+-----------------------+---------------------+

По умолчанию сводная статистика отключена. Включение «summary_stats=’Y’» приводит к приведенной ниже статистике вместе с приведенными выше результатами.

py.summary(summary_stats='Y')
+-------+------------------+------------------+------------------+
|summary|               age|           balance|               day|
+-------+------------------+------------------+------------------+
|  count|             11162|             11162|             11162|
|   mean|41.231947679627304|1528.5385235620856|15.658036194230425|
| stddev|11.913369192215518| 3225.413325946149| 8.420739541006462|
|    min|                18|             -6847|                 1|
|    25%|                32|               122|                 8|
|    50%|                39|               550|                15|
|    75%|                49|              1708|                22|
|    max|                95|             81204|                31|
+-------+------------------+------------------+------------------+

II. Оценка модели

Оцените модель всего двумя строками кода. Приведенная ниже функция «pyspark_model_eval» генерирует почти все необходимые показатели и графики. Он поддерживает бинарные и многоклассовые модели классификации.

from pyspark_easy import pyspark_model_eval
model_res=pyspark_model_eval(model,predicted_df)
model_res.results()

Параметры:

model — обученная модель
predicted_df — кадр данных, содержащий метку и прогнозы.

Мы можем обучить модель бинарной классификации с помощью «bank.csv» и оценить результаты с помощью pyspark-easy.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
# Preprocessing 
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssemblercategoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
# Build pipelines
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
# Model
train, test = df.randomSplit([0.8, 0.2], seed = 2021)
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
model = lr.fit(train)
predictions = model.transform(test)

Теперь мы передаем модель и DataFrame, содержащие прогнозы, в «pyspark_model_eval».

from pyspark_easy import pyspark_model_eval
model_res=pyspark_model_eval(model,predictions)
model_res.results()

Результаты:

The Train vs Test evaluation Metrics are :
+--------------------+--------+--------+
|      Metrics       | Train  |  Test  |
+====================+========+========+
| AreaUnderROC       | 88.430 | 79.980 |
+--------------------+--------+--------+
| Accuracy           | 80.070 | 80.110 |
+--------------------+--------+--------+
| Weighted Precision | 80.110 | 80.630 |
+--------------------+--------+--------+
| Weighted Recall    | 80.070 | 80.110 |
+--------------------+--------+--------+
| Weighted F1 Score  | 80.010 | 79.990 |
+--------------------+--------+--------+
Note: The Weighted metrics - calculated by label then it is averaged

The binary classification metrics for test data are:
+-----------+-----------+
|  Metrics  | Test Data |
+===========+===========+
| Precision | 84.560    |
+-----------+-----------+
| Recall    | 72.790    |
+-----------+-----------+
| F1 Score  | 78.240    |
+-----------+-----------+
The classification report follows :

III. Поиск имени столбца

Поиск столбца с указанной подстрокой в ​​схеме, списке схем или во всей базе данных (Hive/Impala). Для этого не требуется доступ к метахранилищу.

from pyspark_easy import column_search
column_search(spark_session,substring, *schemas)

Параметры:

spark_session — объект искрового сеанса
substring — найти все столбцы, содержащие эту строку . Это оператор LIKE.
*схемы — необязательный аргумент. Если оставить пустым, выполняется поиск по всей базе данных. Для поиска по схемам передайте имена схем в список. ['схема1','схема2']. Это оператор LIKE.

В строке ниже выполняется поиск по всей базе данных: -

column_search(spark,'avg')

Результаты:

Столбцы, найденные в схеме «schema1» в таблице «table1»: {‘account_bal_avg’}
Столбцы, найденные в схеме «schema2» в таблице «table2»: {‘customer_avg_balance, last_month_avg_deposit’}

IV. Генератор дат

Наиболее распространенным требованием в любом проекте разработки Spark является разработка функций. Разработка функций — это навык, которым должен обладать каждый специалист по данным, особенно в случае функций окна даты. pyspark-easy помогает создавать функции, связанные с датой скользящего окна, для учета прошлых значений в нескольких строках кода.

Чтобы сгенерировать даты временных рядов по месяцам:

from pyspark_easy import dates_generator
dates=dates_generator(date,column, backward,*forward)

Параметры:

Дата — строка даты.
Столбец — строка, имя столбца.
Назад — это третий аргумент. Количество месяцев, прошедших с Date. *Вперед — по умолчанию равно 0. Количество месяцев в будущем с Даты.

dates_generator('2020-03-01','customer_average_balance',12,0)

Результаты:

[['2021-02-01', '2021-02-28', 'customer_average_balance_b1'],
 ['2021-01-01', '2021-01-31', 'customer_average_balance_b2'],
 ['2020-12-01', '2020-12-31', 'customer_average_balance_b3'],
 ['2020-11-01', '2020-11-30', 'customer_average_balance_b4'],
 ['2020-10-01', '2020-10-31', 'customer_average_balance_b5'],
 ['2020-09-01', '2020-09-30', 'customer_average_balance_b6'],
 ['2020-08-01', '2020-08-31', 'customer_average_balance_b7'],
 ['2020-07-01', '2020-07-31', 'customer_average_balance_b8'],
 ['2020-06-01', '2020-06-30', 'customer_average_balance_b9'],
 ['2020-05-01', '2020-05-31', 'customer_average_balance_b10'],
 ['2020-04-01', '2020-04-30', 'customer_average_balance_b11'],
 ['2020-03-01', '2020-03-31', 'customer_average_balance_b12']]

Он создает вложенный список. В каждом подсписке есть три аргумента, первый из которых является начальной датой месяца, затем идет конечная дата месяца и имя столбца с суффиксом «_b1», где «b» в «_b1» указывает назад.

Спасибо за чтение! Я разрабатываю дополнительные утилиты, чтобы упростить жизнь специалистам по данным, и собираюсь написать больше статей в будущем. Это мой гитхаб: https://github.com/naresh-datanut/pyspark-easy. Жизнь более красочна, когда помогаешь другим.