Я пытаюсь загрузить в Spark около 1 млн строк из базы данных PostgreSQL. При использовании Spark требуется около 10 секунд. Однако загрузка того же запроса с использованием драйвера psycopg2 занимает 2 секунды. Я использую драйвер postgresql jdbc версии 42.0.0
def _loadFromPostGres(name):
url_connect = "jdbc:postgresql:"+dbname
properties = {"user": "postgres", "password": "postgres"}
df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
return df
df = _loadFromPostGres("""
(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as
user_series_game""")
print measure(lambda : len(df.collect()))
Выход -
--- 10.7214591503 seconds ---
1076131
Использование psycopg2 -
import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
def _exec():
cur.execute("""(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298)""")
return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()
Выход -
--- 2.27961301804 seconds ---
1076131
Функция измерения -
def measure(func) :
start_time = time.time()
x = func()
print("--- %s seconds ---" % (time.time() - start_time))
return x
Пожалуйста, помогите мне найти причину этой проблемы.
Редактировать 1
Я сделал еще несколько тестов. Использование Scala и JDBC -
import java.sql._;
import scala.collection.mutable.ArrayBuffer;
def exec() {
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val conn = DriverManager.getConnection(url,"postgres","postgres");
val sqlText = """SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298"""
val t0 = System.nanoTime()
val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs = stmt.executeQuery()
val list = new ArrayBuffer[(Long, Long, Long, Double)]()
while (rs.next()) {
val seriesId = rs.getLong("seriesId")
val companyId = rs.getLong("companyId")
val userId = rs.getLong("userId")
val score = rs.getDouble("score")
list.append((seriesId, companyId, userId, score))
}
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
println(list.size)
rs.close()
stmt.close()
conn.close()
}
exec()
Результат был -
Elapsed time: 1.922102285s
1143402
Когда я собрал () в Spark + Scala -
import org.apache.spark.sql.SparkSession
def exec2() {
val spark = SparkSession.builder().getOrCreate()
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val sqlText = """(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as user_series_game"""
val t0 = System.nanoTime()
val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", sqlText)
.option("user", "postgres")
.option("password", "postgres")
.load()
val list = df.collect()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
print (list.size)
}
exec2()
Результат был
Elapsed time: 1.486141076s
1143445
Таким образом, на сериализацию Python тратится в 4 раза больше времени. Я понимаю, что будет какое-то наказание, но это кажется слишком большим.