выберите с помощью оконной функции (dense_rank()) в SparkSQL

У меня есть таблица, которая содержит записи о покупках клиентов, мне нужно указать, что покупка была сделана в определенном окне даты и времени, одно окно составляет 8 дней, поэтому, если у меня была покупка сегодня и одна через 5 дней, это означает, что моя покупка, если окно номер 1, но если я сделал это в первый день сегодня и в следующий через 8 дней, первая покупка будет в окне 1, а последняя покупка - в окне 2.

create temporary table transactions
 (client_id int,
 transaction_ts datetime,
 store_id int)

 insert into transactions values 
 (1,'2018-06-01 12:17:37', 1),
 (1,'2018-06-02 13:17:37', 2),
 (1,'2018-06-03 14:17:37', 3),
 (1,'2018-06-09 10:17:37', 2),
 (2,'2018-06-02 10:17:37', 1),
 (2,'2018-06-02 13:17:37', 2),
 (2,'2018-06-08 14:19:37', 3),
 (2,'2018-06-16 13:17:37', 2),
 (2,'2018-06-17 14:17:37', 3)

окно 8 дней, проблема в том, что я не понимаю, как указать для плотности_rank() OVER (PARTITION BY), чтобы посмотреть дату и время и сделать окно через 8 дней, в результате мне нужно что-то вроде этого

1,'2018-06-01 12:17:37', 1,1
1,'2018-06-02 13:17:37', 2,1
1,'2018-06-03 14:17:37', 3,1
1,'2018-06-09 10:17:37', 2,2
2,'2018-06-02 10:17:37', 1,1
2,'2018-06-02 13:17:37', 2,1
2,'2018-06-08 14:19:37', 3,2
2,'2018-06-16 13:17:37', 2,3
2,'2018-06-17 14:17:37', 3,3

Любая идея, как это получить? Я могу запустить его в Mysql или Spark SQL, но Mysql не поддерживает разделы. До сих пор не могу найти решение! любая помощь


person Andrey    schedule 25.07.2018    source источник
comment
Чтобы делать то, что вы хотите, вам нужны рекурсивные CTE. MySQL поддерживает их в v8+. Я не знаю, поддерживает ли их SparkSQL.   -  person Gordon Linoff    schedule 25.07.2018
comment
@GordonLinoff есть идеи, как это исправить? я . знаю концепцию CTE, но не знаю, как ее использовать. в таком случае   -  person Andrey    schedule 25.07.2018


Ответы (2)


Скорее всего, вы можете решить это в Spark SQL, используя функции окна времени и раздела:

val purchases = Seq((1,"2018-06-01 12:17:37", 1), (1,"2018-06-02 13:17:37", 2), (1,"2018-06-03 14:17:37", 3), (1,"2018-06-09 10:17:37", 2), (2,"2018-06-02 10:17:37", 1), (2,"2018-06-02 13:17:37", 2), (2,"2018-06-08 14:19:37", 3), (2,"2018-06-16 13:17:37", 2), (2,"2018-06-17 14:17:37", 3)).toDF("client_id", "transaction_ts", "store_id")

purchases.show(false)
+---------+-------------------+--------+
|client_id|transaction_ts     |store_id|
+---------+-------------------+--------+
|1        |2018-06-01 12:17:37|1       |
|1        |2018-06-02 13:17:37|2       |
|1        |2018-06-03 14:17:37|3       |
|1        |2018-06-09 10:17:37|2       |
|2        |2018-06-02 10:17:37|1       |
|2        |2018-06-02 13:17:37|2       |
|2        |2018-06-08 14:19:37|3       |
|2        |2018-06-16 13:17:37|2       |
|2        |2018-06-17 14:17:37|3       |
+---------+-------------------+--------+



val groupedByTimeWindow = purchases.groupBy($"client_id", window($"transaction_ts", "8 days")).agg(collect_list("transaction_ts").as("transaction_tss"), collect_list("store_id").as("store_ids"))

val withWindowNumber = groupedByTimeWindow.withColumn("window_number", row_number().over(windowByClient))

withWindowNumber.orderBy("client_id", "window.start").show(false)

    +---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
|client_id|window                                       |transaction_tss                                                |store_ids|window_number|
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+
|1        |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-01 12:17:37, 2018-06-02 13:17:37, 2018-06-03 14:17:37]|[1, 2, 3]|1            |
|1        |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-09 10:17:37]                                          |[2]      |2            |
|2        |[2018-05-28 17:00:00.0,2018-06-05 17:00:00.0]|[2018-06-02 10:17:37, 2018-06-02 13:17:37]                     |[1, 2]   |1            |
|2        |[2018-06-05 17:00:00.0,2018-06-13 17:00:00.0]|[2018-06-08 14:19:37]                                          |[3]      |2            |
|2        |[2018-06-13 17:00:00.0,2018-06-21 17:00:00.0]|[2018-06-16 13:17:37, 2018-06-17 14:17:37]                     |[2, 3]   |3            |
+---------+---------------------------------------------+---------------------------------------------------------------+---------+-------------+

Если вам нужно, вы можете explode перечислить элементы из store_ids или transaction_tss.

Надеюсь, поможет!

person Mikhail Dubkov    schedule 26.07.2018

Я не использовал предложенное искровое решение, я сделал это с чистой логикой sql и курсором. это не очень эффективно, но мне нужно сделать работу

person Andrey    schedule 27.07.2018