Медленное чтение Spark из таблицы Postgres JDBC

Я пытаюсь загрузить в Spark около 1 млн строк из базы данных PostgreSQL. При использовании Spark требуется около 10 секунд. Однако загрузка того же запроса с использованием драйвера psycopg2 занимает 2 секунды. Я использую драйвер postgresql jdbc версии 42.0.0

def _loadFromPostGres(name):
    url_connect = "jdbc:postgresql:"+dbname
    properties = {"user": "postgres", "password": "postgres"}
    df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
    return df

df = _loadFromPostGres("""
    (SELECT "seriesId", "companyId", "userId", "score" 
    FROM user_series_game 
    WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

Выход -

--- 10.7214591503 seconds ---
1076131

Использование psycopg2 -

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
    cur.execute("""(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298)""")
    return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

Выход -

--- 2.27961301804 seconds ---
1076131

Функция измерения -

def measure(func) :
    start_time = time.time()
    x = func()
    print("--- %s seconds ---" % (time.time() - start_time))
    return x

Пожалуйста, помогите мне найти причину этой проблемы.


Редактировать 1

Я сделал еще несколько тестов. Использование Scala и JDBC -

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long, Long, Long, Double)]()

while (rs.next()) {
    val seriesId = rs.getLong("seriesId")
    val companyId = rs.getLong("companyId")
    val userId = rs.getLong("userId")
    val score = rs.getDouble("score")
    list.append((seriesId, companyId, userId, score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

Результат был -

Elapsed time: 1.922102285s
1143402

Когда я собрал () в Spark + Scala -

import org.apache.spark.sql.SparkSession

def exec2() {

    val spark = SparkSession.builder().getOrCreate()

    val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

    val sqlText = """(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298) as user_series_game"""

    val t0 = System.nanoTime()

    val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", sqlText)
          .option("user", "postgres")
          .option("password", "postgres")
          .load()

    val list = df.collect()

    val t1 = System.nanoTime()

    println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

    print (list.size)
}

exec2()

Результат был

Elapsed time: 1.486141076s
1143445

Таким образом, на сериализацию Python тратится в 4 раза больше времени. Я понимаю, что будет какое-то наказание, но это кажется слишком большим.


person Abhijit Bhole    schedule 21.04.2017    source источник


Ответы (1)


Причина действительно проста и имеет две одновременные причины.

Сначала я расскажу вам, как работает psycopg2.

Эта библиотека psycopg2 работает как любая другая библиотека для подключения к RDMS. Эта библиотека отправит запрос движку вашего postgres и вернет вам данные. Прямо так.

Conn -> Запрос -> ReturnData -> FetchData

Когда вы используете искру, это немного отличается в двух отношениях. Spark не похож на язык программирования, работающий в одном потоке. У него есть распределенная система для работы. Даже если вы работаете на локальной машине. См. Spark имеет базовую концепцию Драйвер (Мастер) и Рабочие.

Драйвер получает запрос на выполнение запроса к Postgres, драйвер не будет запрашивать данные для каждого рабочего, запрашивающего информацию из вашего Postgres.

Если вы видите документацию, здесь вы увидите такую ​​заметку:

Не создавайте слишком много разделов параллельно в большом кластере; в противном случае Spark может привести к сбою ваших внешних систем баз данных.

Это примечание означает, что каждый работник будет отвечать за запрос данных для вашего postgres. Это небольшие накладные расходы на запуск этого процесса, но ничего особенного. Но здесь есть накладные расходы, чтобы отправить данные каждому работнику.

Второй момент, ваш сбор в этой части кода:

print measure(lambda : len(df.collect()))

Функция сбора отправит всем вашим воркерам команду отправить данные вашему драйверу. Чтобы сохранить в памяти вашего драйвера, это похоже на Reduce, он создает Shuffle в середине процесса. Перемешивание - это шаг процесса, на котором данные отправляются другим рабочим. В случае сбора каждый работник отправит его вашему водителю.

Итак, шаги Spark в JDBC вашего кода:

(Рабочие) Conn -> (Workers) Query -> (Workers) FetchData -> (Driver) Request the Data -> (Workers) Shuffle -> (Driver) Collect

Ну, там есть куча других вещей, которые происходят со Spark, такие как QueryPlan, сборка DataFrame и другие вещи.

Это причина того, что у вас более быстрый ответ в вашем простом коде Python, чем в Spark.

person Thiago Baldim    schedule 23.04.2017
comment
мы сталкиваемся со значительными проблемами при загрузке данных из postgresql в Spark. По сути, наша идея состоит в том, чтобы загрузить все данные в драйвер в фрейм данных pandas и преобразовать их в фрейм данных Spark, а затем запустить Spark Distributed. что ты предлагаешь ? - person Sandeep; 12.07.2017
comment
Пожалуйста, не загружайте все данные в Pandas, это будет плохо. Если у вас есть Spark Cluster, вы должны использовать инструмент JDBC of python для загрузки данных из postgres, чтобы загрузить данные прямо для рабочих. spark.apache.org/ документы / последний / api / python / - person Thiago Baldim; 12.07.2017