искра: Как сделать dropDuplicates в кадре данных, сохранив строку с самой высокой отметкой времени

У меня есть случай использования, когда мне нужно удалить повторяющиеся строки фрейма данных (в этом случае дубликат означает, что у них есть одно и то же поле «id»), сохраняя строку с самым высоким полем «timestamp» (unix timestamp).

Я нашел метод drop_duplicate (я использую pyspark), но у него нет контроля над тем, какой элемент будет сохранен.

Кто-нибудь может помочь? Спасибо заранее


person arnaud briche    schedule 14.04.2016    source источник


Ответы (2)


Для обеспечения желаемой функциональности может потребоваться ручное сопоставление и сокращение.

def selectRowByTimeStamp(x,y):
    if x.timestamp > y.timestamp:
        return x
    return y

dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp) 

Здесь мы группируем все данные по идентификатору. Затем, когда мы сокращаем группировки, мы делаем это, сохраняя запись с самой высокой меткой времени. Когда код будет уменьшен, для каждого идентификатора останется только 1 запись.

person David    schedule 14.04.2016
comment
Какова цель dataMap? - person zero323; 14.04.2016
comment
На самом деле это должно быть data.map(lambda x: (x.id, x)) (или keyBy). Давайте просто исправим это. - person zero323; 14.04.2016
comment
Абсолютно правильный, приятный улов - person David; 14.04.2016
comment
Спасибо, Дэвид, в итоге я воспользовался предложенным вами решением, хотя решение Дэвида Гриффина сработало одинаково хорошо. - person arnaud briche; 15.04.2016

Вы можете сделать что-то вроде этого:

val df = Seq(
  (1,12345678,"this is a test"),
  (1,23456789, "another test"),
  (2,2345678,"2nd test"),
  (2,1234567, "2nd another test")
).toDF("id","timestamp","data")

+---+---------+----------------+
| id|timestamp|            data|
+---+---------+----------------+
|  1| 12345678|  this is a test|
|  1| 23456789|    another test|
|  2|  2345678|        2nd test|
|  2|  1234567|2nd another test|
+---+---------+----------------+

df.join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp|        data|
+---+---------+------------+
|  1| 23456789|another test|
|  2|  2345678|    2nd test|
+---+---------+------------+

Если вы ожидаете, что может быть повторение timestamp для id (см. Комментарии ниже), вы можете сделать это:

df.dropDuplicates(Seq("id", "timestamp")).join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
person David Griffin    schedule 14.04.2016
comment
Это близко, но не гарантирует, что для каждого идентификатора будет одна строка. - person zero323; 14.04.2016
comment
Хм, вы имеете в виду, если метка времени такая же? - person David Griffin; 14.04.2016
comment
Да, точно. Думаю, здесь должна быть возможность просто взять произвольный. - person zero323; 14.04.2016
comment
Это одна из тех вещей, где, если вы знаете свои данные, это может не быть проблемой. И, как вы сказали, вы всегда можете сделать: df.groupBy($"id", $"timestamp").agg(last($"data")), прежде чем делать все остальное. - person David Griffin; 14.04.2016
comment
Это. drop_duplicate мог бы быть более универсальным, чем last. Вы можете обрабатывать всю строку без смешивания значений. - person zero323; 14.04.2016
comment
Ага. Отредактировано. Никогда раньше не использовал dropDuplicates. С другой стороны, у вас, похоже, нет никакого контроля над тем, какая строка удаляется - я предполагаю, что это произвольно. - person David Griffin; 14.04.2016
comment
Спасибо :) Уже проголосовали, так что больше не могу;) - person zero323; 14.04.2016
comment
@David Griffi, отброшенная строка основана на вызове метода first () / head (). Из заданного набора данных, если вы примените вызов метода first (), остальные строки будут удалены при вызове drop_duplicates (). - person Kanav Sharma; 06.07.2017