PySpark: раздвижные окна для выборочных строк

У меня есть кадр данных, содержащий следующие 3 столбца: 1. ID 2. метка времени 3. IP_Address

Данные охватывают период с 2019–07 –01 по 20.09.2019. Я пытаюсь агрегировать количество IP_address за последние 60 дней, разделенных по идентификатору для всех строк между 20-дневным периодом с 2019 г. с 09 -01 по 20 сентября 2019 г. .

Я пробовал использовать следующую оконную функцию, и она отлично работает:

days = lambda i: i*86400
w =  Window.partitionBy('id')\
           .orderBy(unix_timestamp(col('timestamp')))\
           .rangeBetween(start=-days(60), end=Window.currentRow)

df = df.withColumn("ip_counts", count(df.ip_address).over(w))

Однако проблема заключается в том, что он вычисляет эти агрегаты даже для периода, для которого мне не нужны вычисления: с 2019-07-01 по 2019-08-31. Я мог бы легко отфильтровать результаты за выбранный период ретроспективно после расчетов, но мне не нужны ненужные вычисления, поскольку я имею дело с ~ 3-10 миллионами строк в день.

Если я фильтрую фрейм данных следующим образом:

dates = ('2019-09-01', '2019-09-20')
date_from, date_to = [F.to_date(F.lit(s)).cast("timestamp") for s in dates]

w =  Window.partitionBy('id')\
           .orderBy(unix_timestamp(col('timestamp')))\
           .rangeBetween(start=-days(60), end=Window.currentRow)

df = df.where((df.timestamp >= date_from) & (df.timestamp <= date_to))\
       .withColumn("ip_counts", count(df.ip_address).over(w))

в этом случае идентификаторы между этими днями не могут получить доступ к данным для этих идентификаторов за предыдущие 60 дней, и, следовательно, подсчеты неверны.

Что я могу сделать, чтобы вычислить агрегаты только для строк, попадающих в период с 01.09.2019 по 20.09.2019, и в то же время убедиться, что окна имеют доступ к данным за предыдущие 60 дней для каждого из тех агрегаций. Спасибо большое за вашу помощь.


person Tarun    schedule 26.09.2019    source источник


Ответы (1)


Сначала я бы создал новый фрейм данных, содержащий все данные за последние 60 дней, а затем следую вашему первому методу, вычисляя агрегаты только для строк, попадающих в период с 01.09.2019 по 20.09.2019.

person yuqi    schedule 27.09.2019