Я новичок в том, чтобы зажигать и зажигать фреймы данных. У меня есть искровый фреймворк, например:
# For sake of simplicity only one user (uid) is shown, but there are multiple users
+-------------------+-----+-------+
|start_date |uid |count |
+-------------------+-----+-------+
|2020-11-26 08:30:22|user1| 4 |
|2020-11-26 10:00:00|user1| 3 |
|2020-11-22 08:37:18|user1| 3 |
|2020-11-22 13:32:30|user1| 2 |
|2020-11-20 16:04:04|user1| 2 |
|2020-11-16 12:04:04|user1| 1 |
Я хочу создать новый логический столбец со значениями True / False, если в прошлом у пользователя было хотя бы количество событий ›= x, и пометьте эти события значком Правда. Например, для x = 3 я ожидаю получить:
+-------------------+-----+-------+--------------+
|start_date |uid |count | marked_event |
+-------------------+-----+-------+--------------+
|2020-11-26 08:30:22|user1| 4 | True |
|2020-11-26 10:00:00|user1| 3 | True |
|2020-11-22 08:37:18|user1| 3 | True |
|2020-11-22 13:32:30|user1| 2 | True |
|2020-11-20 16:04:04|user1| 2 | True |
|2020-11-16 12:04:04|user1| 1 | False |
То есть для каждого count ›= 3 мне нужно отметить это событие True, а также предыдущие 3-события. Только последнее событие user1 имеет значение False, потому что я отмечаю 3 события перед (включительно) событием start_date = 2020-11-22 08:37:18.
Есть идеи, как подойти к этому? Моя интуиция заключается в том, чтобы как-то использовать окно / задержку для достижения этого, но я новичок в Spark и не уверен, как это сделать ...
Спасибо!
РЕДАКТИРОВАТЬ:
Я закончил использовать вариант решения @mck с небольшим исправлением ошибки: исходное решение имеет:
F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing))
условие, которое заканчивается маркировкой всех событий после 'begin', независимо от того, выполняются ли условия 'count' или нет. Вместо этого я изменил решение, чтобы в окне были отмечены только события, произошедшие до начала:
event = (f.max(f.col('begin')).over(w.rowsBetween(-2, 0))).\
alias('event_post_only')
# the number of events to mark is 3 from 'begin',
# including the event itself, so that's -2.
df_marked_events = df_marked_events.select('*', event)
Затем отметьте True для всех событий, которые были True в 'event_post_only' ИЛИ были True в 'event_post_only'
df_marked_events = df_marked_events.withColumn('event', (col('count') >= 3) \
| (col('event_post_only')))
Это позволяет избежать пометки True для всего восходящего потока как 'begin' == True.