Пользовательский агрегатный расчет Pyspark по столбцам

Я готовлю данные для ввода для классификатора в Pyspark. Я использовал агрегатные функции в SparkSQL для извлечения таких функций, как среднее значение и дисперсия. Они сгруппированы по активности, имени и окну. Окно было рассчитано путем деления временной метки unix на 10000, чтобы разбить на 10-секундные временные окна.

sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data  GROUP BY activity,name,window ORDER BY activity,name,window")

Результат этого будет выглядеть так:

Activity  Name         Window       AvgX       VarX
Walk    accelerometer  95875        2.0          1.0

Сейчас я хочу вычислить средний наклон каждой точки в X.

Для этого мне нужны временная метка, окно и X. Я реализовал логику в Python, используя массивы, вот как это будет выглядеть - вычисление наклона между каждой точкой, а затем получение среднего наклона. В идеале я хотел бы сделать это в UDAF, который еще не поддерживается в Pyspark. (Это будет выглядеть так, скажем, если бы функция ниже называлась slope. Тогда в sql вы могли бы сделать slope(timestamp, X) as avgSlopeX

РЕДАКТИРОВАТЬ - изменен ввод, чтобы он был более четким. Итак, что я делаю в точности, это вычисляю уклон между каждой точкой, а затем возвращаю среднее значение уклонов в этом окне. Итак, поскольку я получаю среднее значение и дисперсию каждого окна, я также хочу получить средний наклон.

#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]

values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]

i = 0; 
slope = 0.0;
totalSlope = 0.0;

while (i < len(timestamp) - 1):
    y2 = values[i+1];
    y1 = values[i];

    x2 = timestamp[i + 1];
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope;
    i=i+1

avgSlope = (totalSlope/len(x_values))

Как я могу это реализовать? Должен ли я попробовать преобразовать в фрейм данных pandas, а затем в массив numpy? Если да, то как я могу убедиться, что данные по-прежнему будут отображаться правильно, имея в виду действие GROUP BY, окно имени в запросе sql.


person other15    schedule 01.07.2016    source источник
comment
Это определенно не работа UDAF.   -  person zero323    schedule 04.07.2016
comment
@ zero323, как бы вы к этому подойти?   -  person other15    schedule 04.07.2016
comment
Вычислите наклон для последовательных точек, а затем возьмите простое среднее. Но описание ввода здесь довольно расплывчатое. Не могли бы вы опубликовать пример данных с ожидаемым результатом?   -  person zero323    schedule 04.07.2016
comment
В частности, вы ожидаете уклон между windows? Если нет, то где метка времени для определения порядка строк?   -  person zero323    schedule 04.07.2016
comment
@ zero323 Я отредактировал образец ввода, чтобы сделать его более понятным. Вычисление наклона последовательных точек и получение среднего - это именно то, что я хочу сделать, но не знаю, какой правильный подход будет делать это в Spark.   -  person other15    schedule 04.07.2016
comment
Я больше думал о схеме SQL / DF :)   -  person zero323    schedule 04.07.2016
comment
@ zero323 Я попробую сделать это, конвертируя в pandas, а затем numpy, но это будет очень неэффективно. :( Если бы вы это сделали, как бы вы это сделали?   -  person other15    schedule 04.07.2016


Ответы (1)


В общем, это не работа для UDAF, потому что UDAF не предоставляют никаких средств для определения порядка. Похоже, что вам действительно нужна комбинация оконных функций и стандартных агрегатов.

from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window

df = ... 
## DataFrame[activity: string, name: string, window: bigint, 
##   timestamp: bigint, value: float]

group = ["activity", "name", "window"]

w = (Window()
    .partitionBy(*group)
    .orderBy("timestamp"))

v_diff = col("value") - lag("value", 1).over(w)
t_diff = col("timestamp") - lag("timestamp", 1).over(w)

slope = v_diff / t_diff

df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope")))
person zero323    schedule 04.07.2016
comment
это кажется хорошим подходом! но withColumn, похоже, игнорирует slope и вместо этого просто возвращает среднее значение: / если slope был зарегистрирован как обычная функция, можно ли его использовать? - person other15; 04.07.2016
comment
Опечатка. Он должен быть наклонным в предложении agg. - person zero323; 04.07.2016