Реализация питонических статистических функций на фреймах данных Spark

У меня есть очень большие наборы данных в фреймах данных Spark, которые распределены по узлам. Я могу делать простые статистические данные, такие как mean, stdev, skewness, kurtosis и т. Д., Используя библиотеки Spark pyspark.sql.functions.

Если я хочу использовать расширенные статистические тесты, такие как Jarque-Bera (JB) или Shapiro-Wilk (SW) и т. Д., Я использую библиотеки python, такие как scipy, поскольку в стандартных библиотеках pyspark apache их нет. Но для этого мне нужно преобразовать фрейм данных Spark в pandas, что означает принудительную передачу данных в главный узел следующим образом:

import scipy.stats as stats
pandas_df=spark_df.toPandas()
JBtest=stats.jarque_bera(pandas_df)
SWtest=stats.shapiro(pandas_df)

У меня есть несколько функций, и каждый идентификатор функции соответствует набору данных, для которого я хочу выполнить тестовую статистику.

У меня вопрос:

Есть ли способ применить эти питонические функции к фрейму данных Spark, пока данные все еще распределяются по узлам, или мне нужно создать свои собственные статистические функции теста JB / SW в искре?

Спасибо за любую ценную информацию


person thentangler    schedule 13.09.2020    source источник
comment
Отвечает ли это на ваш вопрос?   -  person werner    schedule 13.09.2020


Ответы (1)


У вас должна быть возможность определить векторизованную пользовательскую функцию, которая является оболочкой для функции Pandas (https://databricks.com/blog/2017/10/30/introduction-vectorized-udfs-for-pyspark.html), например:

from pyspark.sql.functions import pandas_udf, PandasUDFType
import scipy.stats as stats

@pandas_udf('double', PandasUDFType.SCALAR)

def vector_jarque_bera(x):
    return stats.jarque_bera(x)

# test:
spark_df.withColumn('y', vector_jarque_bera(df['x']))

Обратите внимание, что столбец векторизованной функции принимает столбец в качестве аргумента и возвращает столбец.

(Nb. Декоратор @pandas_udf - это то, что преобразует функцию Pandas, определенную прямо под ним, в векторизованную функцию. Каждый элемент возвращенного вектора сам по себе является скаляром, поэтому передается аргумент PandasUDFType.SCALAR.)

person AltShift    schedule 13.09.2020
comment
Спасибо за этот ответ. Когда я попробовал это, я получил следующую ошибку: RuntimeError: Result vector from pandas_udf was not the required length: expected 10000, got 2 Есть ли минимально необходимая длина для pandas_udf? - person thentangler; 14.09.2020
comment
Ах, извините ... Я предположил, что функция stats.jarque_bera вернула серию Pandas, но на самом деле она возвращает два скаляра. Это не подходит для векторизации. Думаю, вам нужно найти (или написать) распараллеленную реализацию. - person AltShift; 18.09.2020