Состояние фрейма данных pySpark по окну / задержке

Я новичок в том, чтобы зажигать и зажигать фреймы данных. У меня есть искровый фреймворк, например:

# For sake of simplicity only one user (uid) is shown, but there are multiple users 
+-------------------+-----+-------+
|start_date         |uid  |count  |
+-------------------+-----+-------+
|2020-11-26 08:30:22|user1|  4    |
|2020-11-26 10:00:00|user1|  3    |
|2020-11-22 08:37:18|user1|  3    |
|2020-11-22 13:32:30|user1|  2    |
|2020-11-20 16:04:04|user1|  2    |
|2020-11-16 12:04:04|user1|  1    |

Я хочу создать новый логический столбец со значениями True / False, если в прошлом у пользователя было хотя бы количество событий ›= x, и пометьте эти события значком Правда. Например, для x = 3 я ожидаю получить:

+-------------------+-----+-------+--------------+
|start_date         |uid  |count  | marked_event |
+-------------------+-----+-------+--------------+
|2020-11-26 08:30:22|user1|  4    |  True        |
|2020-11-26 10:00:00|user1|  3    |  True        |
|2020-11-22 08:37:18|user1|  3    |  True        |
|2020-11-22 13:32:30|user1|  2    |  True        |
|2020-11-20 16:04:04|user1|  2    |  True        |
|2020-11-16 12:04:04|user1|  1    |  False       |

То есть для каждого count ›= 3 мне нужно отметить это событие True, а также предыдущие 3-события. Только последнее событие user1 имеет значение False, потому что я отмечаю 3 события перед (включительно) событием start_date = 2020-11-22 08:37:18.

Есть идеи, как подойти к этому? Моя интуиция заключается в том, чтобы как-то использовать окно / задержку для достижения этого, но я новичок в Spark и не уверен, как это сделать ...

Спасибо!


РЕДАКТИРОВАТЬ:

Я закончил использовать вариант решения @mck с небольшим исправлением ошибки: исходное решение имеет:

F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing))

условие, которое заканчивается маркировкой всех событий после 'begin', независимо от того, выполняются ли условия 'count' или нет. Вместо этого я изменил решение, чтобы в окне были отмечены только события, произошедшие до начала:

event = (f.max(f.col('begin')).over(w.rowsBetween(-2, 0))).\ 
          alias('event_post_only') 
# the number of events to mark is 3 from 'begin', 
# including the event itself, so that's -2.
df_marked_events = df_marked_events.select('*', event)

Затем отметьте True для всех событий, которые были True в 'event_post_only' ИЛИ ​​были True в 'event_post_only'

df_marked_events = df_marked_events.withColumn('event', (col('count') >= 3) \
                       | (col('event_post_only')))

Это позволяет избежать пометки True для всего восходящего потока как 'begin' == True.




Ответы (1)


person    schedule
comment
Спасибо!! Думаю, он мне подходит почти идеально. Мне потребовалось некоторое время, чтобы понять почему и как это работает. Большое спасибо! - person Ruslan; 30.11.2020
comment
извините за отсутствие документации. надеюсь, это поможет! - person mck; 30.11.2020
comment
Без проблем! Отлично работает :) - person Ruslan; 30.11.2020
comment
Я добавил несколько кратких комментариев, надеюсь, что это поможет другим пользователям - person mck; 30.11.2020