Как эффективно соединить очень большой стол и большой стол в Pyspark

У меня две таблицы. Обе таблицы являются внешними таблицами в улье, хранящимися в формате данных паркета.

Первая таблица table_1 содержит 250 миллионов строк ежедневно с 2015 года. Эта таблица разбита на разделы на основе create_date. Таким образом, на каждую create_date приходится около 250 миллионов строк.

Вторая таблица - table_2 - ежедневная дельта-таблица, и среднее количество строк составляет около 1,5 миллиона строк.

В обеих таблицах есть один общий столбец lookup_id. Теперь мне нужно получить все столбцы из таблицы_1 для дельта-данных из таблицы_2, используя фреймы данных.

Я думал сделать что-то вроде ниже

table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)

Но я сомневаюсь, что это действительно эффективно и сможет ли pyspark справиться с этим без каких-либо ошибок памяти.

Вопрос 1: Как распараллелить сканирование table_1 на основе разделов create_date?

Вопрос 2: Есть ли другой способ оптимизировать сканирование table_1 на основе lookup_ids из table_2 и / или на основе разделов?

Дополнительная информация, чтобы лучше понять, что я ищу:

Я пытаюсь понять, когда мы присоединяемся к таблицам с использованием фреймов данных, действительно ли Spark читает данные и хранит их в памяти и присоединяется к ним, или просто присоединяется при чтении самого себя. Если второе утверждение истинно, то для каких всех соединений применим второй оператор. Также, если есть необходимость использовать цикл, чтобы избежать ошибок памяти.


person Mohan    schedule 04.07.2020    source источник
comment
Есть ли связь между table1.create_date и table2.create_date? Например, верно ли, что если row1.lookup_id == row2.lookup_id, то row1.create_date == row2.create_date для row1 ∈ table1 и row2 ∈ table2?   -  person gudok    schedule 11.07.2020
comment
Нет, доступен только lookup_id. create_date недоступен   -  person Mohan    schedule 11.07.2020


Ответы (2)


Не уверен насчет памяти вашего драйвера и исполнителя, но в целом возможны две оптимизации соединения - широковещательная передача небольшой таблицы всем исполнителям и наличие одного и того же ключа раздела для обоих фреймов данных. В вашем случае перераспределение на основе вашего идентификатора поиска сделает его быстрее, если таблица 2 слишком велика для трансляции. Но ремонт имеет свою цену. Вы можете найти больше здесь - https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:%7E:text=One%20way%20to%20avoid.%20shuffles,затем%20broadcast%20to%20every%20executor.

Дайте мне знать, что вы думаете. с нетерпением жду обсуждения в этой теме.

Если вы не можете транслировать, пример избегания присоединения с использованием группирования - вдохновленный здесь: Spark: предотвращение перемешивания / обмена при объединении двух идентично разделенных фреймов данных

spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')
#test1.

#%%
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')

Теперь попробуй

inputDf3.explain()

Результат будет примерно таким:

== Physical Plan ==
*(3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]
+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
   :- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
   :     +- *(1) Filter isnotnull(item#1033)
   :        +- *(1) FileScan parquet 
   +- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
      +- *(2) Project [id#1038, item#1039]
         +- *(2) Filter isnotnull(item#1039)
            +- *(2) FileScan parquet 

Как видите, здесь не происходит разбиения хэша Exchange. Так что попробуйте сгруппировать оба фрейма данных и попробуйте присоединиться.

person Raghu    schedule 04.07.2020
comment
спасибо, что поделились страницей. Очень полезный контент. Спасибо, что указали на трансляцию. Вместо того, чтобы объединять обе таблицы одновременно, я думаю о трансляции только lookup_id из table_2 и выполнении сканирования таблицы. Затем я присоединюсь к таблице_2, чтобы снова взять нужные столбцы. Однако я не уверен, будет ли широковещательное соединение (или любое соединение) принимать только те данные, которые удовлетворяют условию соединения. Я также ищу способ использовать поле раздела для распараллеливания операции сканирования таблицы. - person Mohan; 04.07.2020
comment
Я думаю, что Spark достаточно умен, чтобы управлять распараллеливанием ваших объединений на основе максимального количества назначенных вами исполнителей. Итак, вам просто нужно транслировать таблицу 2 на все разделы и выполнить соединение. Это позволит избежать перемешивания, а фильтрация и объединение будут эффективными. Если это поможет, подумайте о том, чтобы проголосовать за и принять ответ. - person Raghu; 04.07.2020
comment
Вместо того чтобы сначала прочитать все данные и выполнить соединение, если я пропущу отдельные create_dates в table_1 и попытаюсь присоединиться, будет ли это быстрее, поскольку первая таблица разбивается на разделы на основе даты создания? - person Mohan; 13.07.2020
comment
Я так не думаю, потому что перетасовка все равно произойдет. разделение на более мелкие работы работает, когда нет памяти для обработки задачи, но это не ускоряет процесс. Потому что в конце вы должны объединить их, и это тоже стоит денег, даже если перетасовки нет. с другой стороны, если вы присоединяетесь с помощью ключа раздела, перетасовки не происходит, и это ускоряет процесс. - person Raghu; 13.07.2020
comment
К сожалению, я не могу вывести create_date из второй таблицы, так как его нужно искать в table_1. Я не могу разделить по идентификатору, так как есть другие запросы, в которых используется объединение разделов на основе даты создания. Если данные кластеризованы на основе lookup_id в обеих таблицах, как вы думаете, произойдет ли перемешивание и сортировка? - person Mohan; 13.07.2020
comment
если обе таблицы перераспределены на основе идентификатора поиска, я не думаю, что перетасовка произойдет. Поскольку разделение искры имеет раздел, и те же значения окажутся в одном разделе, я думаю. - person Raghu; 13.07.2020
comment
Я хочу спросить, произойдет ли перемешивание, если lookup_id будет разделен как на table_1, так и на table_2. Раздел table_1 по-прежнему будет create_date, но в разделе create_date данные будут разделены на основе lookup_id. - person Mohan; 13.07.2020
comment
Нашел кое-что интересное - искра stackoverflow.com/questions/59034719/ Итак, согласно здесь, repartition или partitionBy не может избежать перетасовки. только ведро. Я пробовал это и считаю, что это правда, исходя из моего физического плана. поэтому в вашем случае ведите обе таблицы на основе lookupid и присоединяйтесь к нему. проверьте свой физический план - он не должен отображать разбиение хэша Exchange. Также см. Обновленный код в моем ответе - person Raghu; 14.07.2020
comment
Интересно! позвольте мне попробовать как с разделением, так и с сегментированием по отдельности на некоторых тестовых таблицах, чтобы увидеть, какая из них работает лучше всего. Спасибо за вашу помощь. Пожалуйста, подумайте о том, чтобы переформатировать свой ответ на основе ваших недавних выводов для будущих читателей. - person Mohan; 15.07.2020

Когда вы прочтете CSV ... он будет автоматически разделен и будет выполняться параллельная обработка ... на основе конфигурации по умолчанию (в случае, если мы не меняем ничего)

Конкретный ответ на это ... Если у вас есть несжатый текстовый файл размером 30 ГБ, хранящийся в HDFS, то при настройке размера блока HDFS по умолчанию (128 МБ) он будет храниться в 235 блоках, что означает, что RDD, который вы читаете из этого файла, будет 235 разделов.

Здесь есть две вещи 1. Плоские файлы, например, CSV, и 2. сжатый файл, например паркет.

  1. Когда у вас есть текстовый файл ... Когда Spark читает файл из HDFS, он создает один раздел для одного входного разделения. Разделение ввода устанавливается Hadoop InputFormat, используемым для чтения этого файла. Например, если вы используете textFile (), это будет TextInputFormat в Hadoop, который вернет вам один раздел для одного блока HDFS (но разделение между разделами будет выполняться по разделению строк, а не по точному разделению блока), если только у вас есть сжатый текстовый файл.

  2. Для паркетного или сжатого файла: в случае сжатого файла вы получите один раздел для одного файла (поскольку сжатые текстовые файлы не могут быть разделены).

Теперь, когда вы используете паркет, он уже хорошо разбит на разделы, при оптимизации вы можете проверить размер кластера, узнать, сколько разделов произошло и т. Д.

Итак, ответьте: Вопрос 1. Как распараллелить сканирование table_1 на основе разделов create_date? Это уже разделено

Для, Вопрос 2: есть ли другой способ оптимизировать сканирование table_1 на основе lookup_ids из table_2 и / или на основе разделов?

Вы можете попробовать отфильтровать ненужные записи. Эта концепция называется Spark predicate push down в Spark SQL-запросах, поэтому даже перед загрузкой данных в память Spark отфильтрует ненужные столбцы .. подробнее здесь

Передача предиката Spark в базу данных позволяет лучше оптимизировать запросы Spark. Предикат - это условие запроса, возвращающее истину или ложь, обычно находящееся в предложении WHERE. Смещение предиката вниз фильтрует данные в запросе к базе данных, уменьшая количество записей, извлекаемых из базы данных, и улучшая производительность запроса. По умолчанию Spark Dataset API автоматически отправляет действительные предложения WHERE в базу данных.

person dsk    schedule 07.07.2020
comment
Спасибо за ваш ответ. Я не могу использовать PPD, поскольку вторая таблица уже является дельта-таблицей, поэтому все фильтры уже применяются перед загрузкой этой таблицы. Мне нужно найти все идентификаторы поиска из дельта-таблицы. Что мне непонятно, так это то, что когда я выполняю шаги, которые я дал в моем вопросе, сначала будет искровое чтение таблицы и выполнено соединение в искровой памяти? или он попытается присоединиться при чтении самого себя? Если он попытается присоединиться при чтении самого себя, применимо ли это ко всем соединениям? или конкретно к какому-нибудь присоединяется? - person Mohan; 11.07.2020
comment
На самом деле это не так, на основе заданного фильтра данные будут отправлены исполнителю или исполнитель будет читать только эти данные, теперь все это соединение, count () - это преобразование в искре, поэтому логический план будет сначала создайте, опубликуйте это, будет создан окончательный физический план и будет разработана линия DAG, так что вы можете сказать, сначала произойдет фильтр, а затем преобразование; Надеюсь, это внесет ясность. - person dsk; 13.07.2020
comment
Итак, если данные считываются первыми, данные копируются в фреймы данных, а затем происходит соединение. Косвенно я ищу способ выполнить опускание предиката, используя вторую таблицу lookup_id, но вторая таблица также имеет 1,5 миллиона строк. Поэтому мне интересно, поможет ли широковещательное соединение искроить фильтрацию данных при чтении самого себя. Мой кластер имеет ограниченную емкость, поэтому я не могу одновременно использовать больше памяти исполнителя. Я хочу, чтобы соединение выполнялось эффективно. - person Mohan; 13.07.2020
comment
Mohan - широковещательное соединение не поможет вам отфильтровать данные, широковещательное соединение помогает уменьшить сетевой вызов, отправляя набор данных / делая доступным набор данных, который вы транслируете, каждому исполнителю / узлу в вашем кластере. Кроме того, 1,5 миллиона больших объемов данных - не такая уж большая нагрузка :) Надеюсь, это поможет .. - person dsk; 15.07.2020
comment
Замечательно, что это помогло вам ???? был бы признателен, если бы вы могли проголосовать тем временем - person dsk; 15.07.2020