Функция окна PySpark: несколько условий в порядке от rangeBetween / rowsBetween

Можно ли создать функцию Window, которая может иметь несколько условий в orderBy для rangeBetween или rowsBetween. Предположим, у меня есть фрейм данных, как показано ниже.

user_id     timestamp               date        event
0040b5f0    2018-01-22 13:04:32     2018-01-22  1       
0040b5f0    2018-01-22 13:04:35     2018-01-22  0   
0040b5f0    2018-01-25 18:55:08     2018-01-25  1       
0040b5f0    2018-01-25 18:56:17     2018-01-25  1       
0040b5f0    2018-01-25 20:51:43     2018-01-25  1       
0040b5f0    2018-01-31 07:48:43     2018-01-31  1       
0040b5f0    2018-01-31 07:48:48     2018-01-31  0       
0040b5f0    2018-02-02 09:40:58     2018-02-02  1       
0040b5f0    2018-02-02 09:41:01     2018-02-02  0       
0040b5f0    2018-02-05 14:03:27     2018-02-05  1       

Для каждой строки мне нужна сумма значений столбца event, дата которых не превышает 3 дней. Но я не могу суммировать события, которые произошли позже в тот же день. Я могу создать оконную функцию, например:

days = lambda i: i * 86400
my_window = Window\
                .partitionBy(["user_id"])\
                .orderBy(F.col("date").cast("timestamp").cast("long"))\
                .rangeBetween(-days(3), 0)

Но это будет включать события, которые произошли позже в тот же день. Мне нужно создать оконную функцию, которая будет действовать как (для строки с *):

user_id     timestamp               date        event
0040b5f0    2018-01-22 13:04:32     2018-01-22  1----|==============|   
0040b5f0    2018-01-22 13:04:35     2018-01-22  0  sum here       all events
0040b5f0    2018-01-25 18:55:08     2018-01-25  1 only           within 3 days 
* 0040b5f0  2018-01-25 18:56:17     2018-01-25  1----|              |
0040b5f0    2018-01-25 20:51:43     2018-01-25  1===================|       
0040b5f0    2018-01-31 07:48:43     2018-01-31  1       
0040b5f0    2018-01-31 07:48:48     2018-01-31  0       
0040b5f0    2018-02-02 09:40:58     2018-02-02  1       
0040b5f0    2018-02-02 09:41:01     2018-02-02  0       
0040b5f0    2018-02-05 14:03:27     2018-02-05  1       

Я пытался создать что-то вроде:

days = lambda i: i * 86400
my_window = Window\
                .partitionBy(["user_id"])\
                .orderBy(F.col("date").cast("timestamp").cast("long"))\
                .rangeBetween(-days(3), Window.currentRow)\
                .orderBy(F.col("t_stamp"))\
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)

Но он отражает только последний orderBy.

Таблица результатов должна выглядеть так:

user_id     timestamp               date        event   event_last_3d
0040b5f0    2018-01-22 13:04:32     2018-01-22  1       1
0040b5f0    2018-01-22 13:04:35     2018-01-22  0       1
0040b5f0    2018-01-25 18:55:08     2018-01-25  1       2
0040b5f0    2018-01-25 18:56:17     2018-01-25  1       3
0040b5f0    2018-01-25 20:51:43     2018-01-25  1       4
0040b5f0    2018-01-31 07:48:43     2018-01-31  1       1
0040b5f0    2018-01-31 07:48:48     2018-01-31  0       1
0040b5f0    2018-02-02 09:40:58     2018-02-02  1       2
0040b5f0    2018-02-02 09:41:01     2018-02-02  0       2
0040b5f0    2018-02-05 14:03:27     2018-02-05  1       2

Я застрял на этом в течение некоторого времени, я был бы признателен за любой совет о том, как к нему подойти.


person Alexandr Serbinovskiy    schedule 08.02.2018    source источник
comment
Разве нельзя просто упорядочить по столбцу метки времени напрямую и пропустить сортировку по дате (поскольку метка времени содержит дату в противоположном направлении)?   -  person Shaido    schedule 19.03.2018
comment
@B_Miner - пинг   -  person sujit    schedule 23.03.2018
comment
Спасибо, суджит, я попробую подключить scala (с которым я не работаю) к python.   -  person B_Miner    schedule 25.03.2018


Ответы (1)


Я написал эквивалент на scala, который соответствует вашим требованиям. Я думаю, что преобразовать в python не составит труда:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val DAY_SECS = 24*60*60 //Seconds in a day
//Given a timestamp in seconds, returns the seconds equivalent of 00:00:00 of that date
val trimToDateBoundary = (d: Long) => (d / 86400) * 86400
//Using 4 for range here - since your requirement is to cover 3 days prev, which date wise inclusive is 4 days
//So e.g. given any TS of 25 Jan, the range will cover (25 Jan 00:00:00 - 4 times day_secs = 22 Jan 00:00:00) to current TS
val wSpec = Window.partitionBy("user_id").
                orderBy(col("timestamp").cast("long")).
                rangeBetween(trimToDateBoundary(Window.currentRow)-(4*DAY_SECS), Window.currentRow)
df.withColumn("sum", sum('event) over wSpec).show()

Ниже приведен результат этого при применении к вашим данным:

+--------+--------------------+--------------------+-----+---+
| user_id|           timestamp|                date|event|sum|
+--------+--------------------+--------------------+-----+---+
|0040b5f0|2018-01-22 13:04:...|2018-01-22 00:00:...|  1.0|1.0|
|0040b5f0|2018-01-22 13:04:...|2018-01-22 00:00:...|  0.0|1.0|
|0040b5f0|2018-01-25 18:55:...|2018-01-25 00:00:...|  1.0|2.0|
|0040b5f0|2018-01-25 18:56:...|2018-01-25 00:00:...|  1.0|3.0|
|0040b5f0|2018-01-25 20:51:...|2018-01-25 00:00:...|  1.0|4.0|
|0040b5f0|2018-01-31 07:48:...|2018-01-31 00:00:...|  1.0|1.0|
|0040b5f0|2018-01-31 07:48:...|2018-01-31 00:00:...|  0.0|1.0|
|0040b5f0|2018-02-02 09:40:...|2018-02-02 00:00:...|  1.0|2.0|
|0040b5f0|2018-02-02 09:41:...|2018-02-02 00:00:...|  0.0|2.0|
|0040b5f0|2018-02-05 14:03:...|2018-02-05 00:00:...|  1.0|2.0|
+--------+--------------------+--------------------+-----+---+

Я не использовал столбец «дата». Не уверен, как мы можем выполнить ваше требование с учетом этого. Итак, если существует вероятность того, что дата TS отличается от столбца даты, это решение не покрывает ее.

Примечание. rangeBetween, который принимает Column аргументы, были введены в Spark 2.3.0, который принимает столбцы типа даты / времени. Так что это решение может быть более элегантным.

person sujit    schedule 22.03.2018