Как лучше всего объединить несколько таблиц соединений jdbc в Spark?

Я пытаюсь перенести запрос в pyspark, и мне нужно объединить в нем несколько таблиц. Все рассматриваемые таблицы находятся в Redshift, и я использую соединитель jdbc, чтобы общаться с ними.

Моя проблема в том, как сделать эти соединения оптимальным образом, не читая слишком много данных (например, загружая таблицу и присоединяясь по ключу) и не просто явно используя:

spark.sql("""join table1 on x=y join table2 on y=z""")

Есть ли способ отправить запросы в Redshift, но по-прежнему использовать Spark df API для написания логики, а также использовать df из контекста искры, не сохраняя их в Redshift только для объединений?


person someRandomGuy    schedule 30.01.2020    source источник
comment
какой разъем вы используете? это github.com/databricks/spark-redshift   -  person Salim    schedule 01.02.2020
comment
@Salim Я использую JDBC-коннектор amazon Redshift, совместимый с JDBC 4.2, docs.aws.amazon.com/redshift/latest/mgmt/ Но я могу переключиться на драйвер Databricks, если это даст мне оптимальное решение.   -  person someRandomGuy    schedule 01.02.2020
comment
В документации говорится, что это предполагало нажатие на предикат. Я не использовал ни то, ни другое, чтобы представить практическую перспективу, но я могу говорить теоретически.   -  person Salim    schedule 01.02.2020


Ответы (1)


Пожалуйста, найдите следующие моменты, которые следует учитывать:

  • коннектор будет выталкивать указанные фильтры, только если в вашем коде Spark есть какой-либо фильтр. например select * from tbl where id > 10000. Вы можете убедиться в этом сами, просто проверьте ответственный Scala код. Также вот соответствующий тест, который и демонстрирует. Тест test("buildWhereClause with multiple filters") пытается проверить, что переменная expectedWhereClause равна whereClause, сгенерированной соединителем. Сгенерированное предложение where должно быть:
"""
        |WHERE "test_bool" = true
        |AND "test_string" = \'Unicode是樂趣\'
        |AND "test_double" > 1000.0
        |AND "test_double" < 1.7976931348623157E308
        |AND "test_float" >= 1.0
        |AND "test_int" <= 43
        |AND "test_int" IS NOT NULL
        |AND "test_int" IS NULL
      """

которое произошло из Spark-filters, указанного выше.

  • Драйвер поддерживает также column filtering. Это означает, что он загрузит только необходимые столбцы, сдвинув допустимые столбцы вниз до красного смещения. Вы можете еще раз убедиться в этом с помощью соответствующего Scala test (" DefaultSource поддерживает простую фильтрацию столбцов ") и test (" запрос с сокращенными и отфильтрованными сканированиями ").

  • Хотя в вашем случае вы не указали никаких фильтров в своем запросе на соединение, поэтому Spark не может использовать две предыдущие оптимизации. Если вы знаете о таких фильтрах, пожалуйста, не стесняйтесь их применять.

  • И последнее, но не менее важное: как уже упоминал Салим, официальный коннектор Spark для красного смещения можно найти здесь . Коннектор Spark построен на основе драйвера JDBC Amazon Redshift. поэтому он все равно попытается использовать его, как указано в code.

person abiratsis    schedule 01.02.2020
comment
Да, мне известно о том, что искры опускают фильтры в базу данных для оптимизации. В данном случае я не знаю фильтров. Таблицы фильтруются напрямую по условию соединения, и я не думаю, что извлечение таблицы и создание списка значений в столбце соединения и использование его для фильтрации другой таблицы - хорошая идея. Как говорится в вопросе, мне интересно найти способ запускать несколько объединений без простого написания sql и без сохранения df в памяти в Redshift только для одного соединения. Спасибо - person someRandomGuy; 02.02.2020
comment
Я думаю, что в этом случае (без какого-либо фильтра) результат будет таким же, как и в любой СУБД, которая должна сканировать обе таблицы на предмет ключей, а затем применять операцию соединения. Каждая СУБД имеет внутренние механизмы для применения оптимизаций во время крупных операций, таких как join, groupBy, sort и т. Д. Я не знаю ни одного механизма Spark, который использует эти функции СУБД. - person abiratsis; 02.02.2020
comment
Выше я указал на все доступные оптимизации для красного смещения, чтобы прояснить, какая часть драйвера красного смещения отвечает за какие оптимизации. Если вы проверите код соединителя и ответственные части (FilterPushdown.scala, RedshiftReadSuite.scala и т. Д.), Вы быстро поймете, что нет никакого фильтра, связанного с объединениями. - person abiratsis; 02.02.2020
comment
Правильно, какова ваша рекомендация в этом случае? Должен ли я сохранить искру df в таблицу и написать SQL-запрос для объединения и позволить Redshift максимально оптимизировать мой запрос? - person someRandomGuy; 03.02.2020
comment
Я бы порекомендовал найти одно или несколько полей для применения фильтрации. Это может быть что угодно: productId, categoryId, insertAt, modifiedAt и т. Д. Затем вы используете фильтр вместе с объединением. Например, если этот фильтр является целым числом, вы можете присоединиться к диапазонам 1-я партия 1-100, 2-я 101-200, 3-я 201-300 и т. Д. - person abiratsis; 03.02.2020