Как выбрать стратифицированную выборку одинакового размера из фрейма данных в Apache Spark?

У меня есть фрейм данных в Spark 2, как показано ниже, где у пользователей от 50 до тысяч сообщений. Я хотел бы создать новый фрейм данных, в котором будут все пользователи в исходном фрейме данных, но только с 5 случайно выбранными сообщениями для каждого пользователя.

+--------+--------------+--------------------+
| user_id|       post_id|                text|
+--------+--------------+--------------------+
|67778705|44783131591473|some text...........|
|67778705|44783134580755|some text...........|
|67778705|44783136367108|some text...........|
|67778705|44783136970669|some text...........|
|67778705|44783138143396|some text...........|
|67778705|44783155162624|some text...........|
|67778705|44783688650554|some text...........|
|68950272|88655645825660|some text...........|
|68950272|88651393135293|some text...........|
|68950272|88652615409812|some text...........|
|68950272|88655744880460|some text...........|
|68950272|88658059871568|some text...........|
|68950272|88656994832475|some text...........|
+--------+--------------+--------------------+

Что-то вроде posts.groupby('user_id').agg(sample('post_id')) но в pyspark такой функции нет.

Любой совет?

Обновление:

Этот вопрос отличается от другого тесно связанного с ним вопроса stratified-sampling-in-spark двумя способами:

  1. Он спрашивает о непропорциональной стратифицированной выборке, а не об общем пропорциональном методе в другом вопросе выше.
  2. Он спрашивает об этом в Spark Dataframe API, а не в RDD.

Я также обновил заголовок вопроса, чтобы прояснить это.


person Majid Alfifi    schedule 07.01.2017    source источник


Ответы (2)


Использование sampleBy приведет к приблизительному решению. Вот альтернативный подход, который немного более хитрый, чем подход, описанный выше, но всегда приводит к точно таким же размерам выборки.

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

df.withColumn("row_num",row_number().over(Window.partitionBy($"user_id").orderBy($"something_random"))

Если у вас еще нет случайного идентификатора, вы можете использовать org.apache.spark.sql.functions.rand для создания столбца со случайным значением, чтобы гарантировать случайную выборку.

person BushMinusZero    schedule 25.10.2017
comment
Красивый элегантный подход! Я выбираю это как ответ. (не уверен, есть ли проблемы с производительностью) - person Majid Alfifi; 08.09.2018
comment
вы можете фильтровать по row_num, чтобы получить необходимое количество выборок, следующим образом: df.withColumn (row_num, row_number (). over (Window.partitionBy ($ user_id) .orderBy ($ something_random)). where (col (row_num) ‹ = 5) - person Balázs Fehér; 19.09.2018

Вы можете использовать метод .sampleBy(...) для DataFrames http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

Вот рабочий пример:

import numpy as np
import string
import random

# generate some fake data
p = [(
    str(int(e)), 
    ''.join(
        random.choice(
            string.ascii_uppercase + string.digits) 
        for _ in range(10)
    )
) for e in np.random.normal(10, 1, 10000)]

posts = spark.createDataFrame(p, ['label', 'val'])

# define the sample size
percent_back = 0.05

# use this if you want an (almost) exact number of samples
# sample_count = 200
# percent_back = sample_count / posts.count()

frac = dict(
    (e.label, percent_back) 
    for e 
    in posts.select('label').distinct().collect()
)

# use this if you want (almost) balanced sample
# f = posts.groupby('label').count()

# f_min_count can also be specified to be exact number 

# e.g. f_min_count = 5

# as long as it is less the the minimum count of posts per user
# calculated from all the users

# alternatively, you can take the minimum post count
# f_min_count = f.select('count').agg(func.min('count').alias('minVal')).collect()[0].minVal

# f = f.withColumn('frac',f_min_count/func.col('count'))

# frac = dict(f.select('label', 'frac').collect())

# sample the data
sampled = posts.sampleBy('label', fractions=frac)

# compare the original counts with sampled
original_total_count = posts.count()
original_counts = posts.groupby('label').count()
original_counts = original_counts \
    .withColumn('count_perc', 
                original_counts['count'] / original_total_count)

sampled_total_count = sampled.count()
sampled_counts = sampled.groupBy('label').count()
sampled_counts = sampled_counts \
    .withColumn('count_perc', 
                sampled_counts['count'] / sampled_total_count)


print(original_counts.sort('label').show(100))
print(sampled_counts.sort('label').show(100))

print(sampled_total_count)
print(sampled_total_count / original_total_count)
person TDrabas    schedule 07.01.2017
comment
Хороший пример! Не могли бы вы пояснить, как получить точное количество образцов? Я запустил закомментированный код выше, но все равно получаю разные размеры выборки. - person Majid Alfifi; 08.01.2017
comment
Вы получите не точное число, а что-то близкое к нему. Это просто другой способ определения процентов. В приведенном выше случае это будет 200/10000 = 2%. - person TDrabas; 08.01.2017
comment
Мне удалось получить образцы, достаточно близкие к размеру выборки, в которой я нуждался, определяя дроби следующим образом: frac = posts.groupby ('label'). Count (). WithColumn ('frac', sample_count / F.col ('count' ) .toPandas (). set_index ('метка') ['гидроразрыв »]. to_dict () - person Majid Alfifi; 08.01.2017
comment
Если бы вы могли включить в свой ответ указанное выше изменение, я могу принять его. - person Majid Alfifi; 08.01.2017
comment
Как это будет работать? sample_count = 200, и вы делите его на количество для каждого label. Например, label = 6 будет иметь ~ 10 наблюдений. Затем ваша функция оценивается как 20, и это то, что вы не можете передать как fractions методу .sampleBy(...). Более того, то, что вы получите взамен, не будет стратифицированной выборкой, то есть выборкой с теми же пропорциями значений меток, что и в исходном наборе данных. Метод .sampleBy(...), скрытый под капотом, выполняет n (где n - количество values в label) единообразной выборке из всех записей, где label == val. - person TDrabas; 08.01.2017
comment
В вашем коде вы передаете одну и ту же дробь всем меткам, но это все равно даст мне больший размер выборки для меток с большим количеством значений. Чтобы учесть это, я даю меньшие отношения для меток с большим количеством значений и большие отношения для недостаточно представленных меток. Я по-прежнему передаю только ratios dict в sampleBy, как обычно. - person Majid Alfifi; 08.01.2017
comment
Другими словами, вам нужна не стратифицированная выборка, а сбалансированная. Просто ожидайте, что это не будет равно количеству образцов, которое вы установили, поскольку оно будет ограничено счетчиком наименьшего значения. Следовательно, это то, что вы хотите: f = posts.groupby('label').count(); f_min_count = f.select('count').agg(func.min('count').alias('minVal')).collect(); f = f.withColumn('frac',f_min_count[0].minVal/func.col('count')) frac = dict(f.select('label', 'frac').collect()). Это вернет несколько сбалансированный образец (количество, скорее всего, не будет равным, поскольку мы имеем дело с пропорциями) - person TDrabas; 08.01.2017
comment
Кроме того, если вам нужны только 5 случайных сообщений от каждого пользователя, измените f_min_count[0].minVal на 5. Обратите внимание, что, поскольку это случайная выборка, и вы можете указать только пропорции для метода sampleBy(...). Кроме того, небольшое предостережение: случайное количество сообщений, запрашиваемых для каждого пользователя, должно быть меньше минимального количества сообщений от всех пользователей. - person TDrabas; 08.01.2017
comment
Мне не нужно было заниматься этим, потому что у меня было не менее 50 сообщений для любого пользователя, но ваше решение более общее. Я изменил свой образец на 20 и запустил его 10 раз, и получил от 10 до 33, что не идеально, но достаточно хорошо для моего эксперимента. - person Majid Alfifi; 08.01.2017
comment
Мне нужно получить 10% каждой метки в test df, поэтому я сделал dict со всей меткой класса в качестве ключа и 0,1 в качестве их значений, таких как {0: 0.1, 1:0.1, 2: 0.1, ..., 20:0.1}. Я предоставил этот dict как аргумент дроби функции sampleBy(), которая дает мне образец фрейма данных, затем я использую уникальный идентификатор для фильтрации основного df, чтобы получить тестовый df. Однако после этого есть разница между test_df.count() и len(test_df.collect()) каждый раз, когда эта разница является произвольной. Вы можете помочь, что здесь не так? @TDrabas - person Aditya; 09.07.2018