Pyspark levenshtein Join застрял на одной сцене

Я хочу выполнить соединение на основе расстояния Левенштейна.

У меня есть 2 кадра данных:

  • Данные: небольшой фрейм данных со 130 000 строк.

  • Сирена: большой фрейм данных, представляющий таблицу stockunitelegale базы данных SIREN с 20 миллионами строк.

Я попытался передать небольшой фрейм данных и выполнить соединение с большим набором данных. Задание Spark всегда зависает на последних трех задачах присоединения...


data=broadcast(data)
df=siren.join(spk, 1-(fn.levenshtein(data["description_nom"], 
                               siren["denominationUniteLegale"])/ fn.greatest(fn.length(data["description_nom"]),
                                                                           fn.length(siren["denominationUniteLegale"])))>0.7)
df.write.csv("join_siren.csv")

Я также пытался увеличить количество исполнителей, количество ядер и выделенную память, но у меня было такое же поведение с точки зрения обработки.

--num-executors 20 --conf "spark.executor.memory=12g" --conf "spark.executor.cores=15" --conf "spark.sql.shuffle.partitions=1000" --conf "spark.default.parallelism=1000" --conf "spark.driver.maxResultSize=8g" --conf "spark.sql.autoBroadcastJoinThreshold=20485760" --conf "spark.network.timeout=10000000"

У вас есть какие-либо решения, пожалуйста, для такого рода проблемы?


person bms    schedule 11.02.2021    source источник
comment
Попробуйте пошагово, игнорируйте широковещательную рассылку, просто сначала присоединитесь к таблицам joined_df =df1.join(df2,levenshtein(df1['description'], df2['description'])) и сохраните. Затем в новом скрипте выполните другие операции, используя умное сокращение карты или UDF.   -  person Leo103    schedule 12.02.2021
comment
@ Leo103 Спасибо за ответ. Мы не можем сделать соединение с levenstein без условия.   -  person bms    schedule 12.02.2021


Ответы (1)


Несмотря на то, что один из ваших фреймов данных мал, то, что вы, по сути, делаете, является декартовым произведением с этим соединением. Потому что для того, чтобы искра знала, какая комбинация строк удовлетворяет вашему условию, она должна сначала создать каждую отдельную комбинацию строк из левого и правого фрейма данных соединения. Это означает, что 130k умножить на 20m, что будет два триллиона шестьсот миллиардов строк. Это довольно большой.

Я бы предложил сначала присоединиться к какому-то другому условию, которое уменьшит пространство поиска вашего нечеткого совпадения имен. Например, вы можете использовать что-то вроде почтового индекса. Некоторым другим примером может быть разделение ваших столбцов description_nom и denominationUniteLegale по пространству и объединение по любому совпадению токенов. А потом фильтровать

    1-(fn.levenshtein(data["description_nom"], 
                                   siren["denominationUniteLegale"])/ fn.greatest(fn.length(data["description_nom"]),
                                                                               fn.length(siren["denominationUniteLegale"])))>0.7

Это будет зависеть от того, что у вас есть в ваших кадрах данных. Вам придется немного поэкспериментировать, что работает хорошо. Например, с предложением почтового индекса вы можете сделать это:

df=siren.join(data, data["postcode"]==siren["codePostalEtablissement"])
df=df.filter(1-(fn.levenshtein(data["description_nom"], 
                               siren["denominationUniteLegale"])/ fn.greatest(fn.length(data["description_nom"]),
                                                                               fn.length(siren["denominationUniteLegale"])))>0.7)
df.write.csv("join_siren.csv")

Конечно, это всего лишь пример, и вы можете комбинировать различные условия в соединении, такие как соединение по почтовому индексу, имени улицы или совпадению токена имени. Но вам все равно нужно проверить, что он не будет генерировать слишком много строк, как это было с cartesianJoin. Для этого вы должны проверить любое из ваших условий в худшем случае. Например, для почтового индекса, если вы посчитаете количество строк в data и siren, сгруппированных по почтовому индексу, определите наибольшую комбинацию. Предполагая, что у вас есть столбец с именем postcode в data, вы можете получить эту информацию следующим образом:

dataPostcodeCount = data.groupBy('postcode').count()
sirenPostcodeCount = siren.groupBy('codePostalEtablissement').count()
postCodeCounts = dataPostcodeCount \
    .join(sirenPostcodeCount, data['postcode'] == siren['codePostalEtablissement']) \
    .select('postcode', (data['count']*siren['count']).alias('count'))
postCodeCounts.sortBy(postCodeCounts['count'].desc()).show()
person Seb    schedule 12.02.2021