Apache Spark JOIN с динамическим повторным разделением

Я пытаюсь сделать довольно простое соединение двух таблиц, ничего сложного. Загрузите обе таблицы, выполните соединение и обновите столбцы, но он продолжает генерировать исключение.

Я заметил, что задача застревает на последнем разделе 199/200 и в конечном итоге вылетает. Я подозреваю, что данные искажены, что приводит к загрузке всех данных в последний раздел 199.

SELECT COUNT(DISTINCT report_audit) FROM ReportDs = 1.5million.

Пока

SELECT COUNT(*) FROM ReportDs = 57million.

Детали кластера: ЦП: 40 ядер, Память: 160 ГБ.

Вот мой пример кода:

...
def main(args: Array[String]) {

  val log = LogManager.getRootLogger
  log.setLevel(Level.INFO)

  val conf = new SparkConf().setAppName("ExampleJob")
                          //.setMaster("local[*]")
                          //.set("spark.sql.shuffle.partitions", "3000")
                          //.set("spark.sql.crossJoin.enabled", "true")
                          .set("spark.storage.memoryFraction", "0.02")
                          .set("spark.shuffle.memoryFraction", "0.8")
                          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                          .set("spark.default.parallelism", (CPU * 3).toString)


  val sparkSession = SparkSession.builder()
                                 .config(conf)
                                 .getOrCreate()


  val reportOpts = Map(
              "url"     -> s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE",
              "driver"  -> "org.postgresql.Driver",
              "dbtable" -> "REPORT_TBL",
              "user"    -> DB_USER,
              "password"-> DB_PASSWORD,
              "partitionColumn" -> RPT_NUM_PARTITION,
              "lowerBound" -> RPT_LOWER_BOUND,
              "upperBound" -> RPT_UPPER_BOUND,
              "numPartitions" -> "200"
            )


  val accountOpts = Map(
                "url"     -> s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE",
                "driver"  -> "org.postgresql.Driver",
                "dbtable" -> ACCOUNT_TBL,
                "user"    -> DB_USER,
                "password"-> DB_PASSWORD,
                "partitionColumn" -> ACCT_NUM_PARTITION,
                "lowerBound" -> ACCT_LOWER_BOUND,
                "upperBound" -> ACCT_UPPER_BOUND,
                "numPartitions" -> "200"
              )

  val sc = sparkSession.sparkContext;

  import sparkSession.implicits._

  val reportDs = sparkSession.read.format("jdbc").options(reportOpts).load.cache().alias("a")

  val accountDs = sparkSession.read.format("jdbc").options(accountOpts).load.cache().alias("c")

  val reportData =  reportDs.join(accountDs, reportDs("report_audit") === accountDs("reference_id"))
                                        .withColumn("report_name", when($"report_id" === "xxxx-xxx-asd", $"report_id_ref_1")
                                                                   .when($"report_id" === "demoasd-asdad-asda", $"report_id_ref_2")
                                                                   .otherwise($"report_id_ref_1" + ":" + $"report_id_ref_2"))
                                        .withColumn("report_version", when($"report_id" === "xxxx-xxx-asd", $"report_version_1")
                                                                       .when($"report_id" === "demoasd-asdad-asda", $"report_version_2")
                                                                       .otherwise($"report_version_3"))
                                        .withColumn("status", when($"report_id" === "xxxx-xxx-asd", $"report_status")
                                                                .when($"report_id" === "demoasd-asdad-asda", $"report_status_1")
                                                                .otherwise($"report_id"))
                                        .select("...")






  val prop = new Properties()
  prop.setProperty("user", DB_USER)
  prop.setProperty("password", DB_PASSWORD)
  prop.setProperty("driver", "org.postgresql.Driver")


  reportData.write
                  .mode(SaveMode.Append)
                  .jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", "cust_report_data", prop)


  sparkSession.stop()

Я думаю, что должен быть элегантный способ справиться с такой асимметрией данных.


person Adetiloye Philip Kehinde    schedule 02.12.2016    source источник
comment
Ваши значения для partitionColumn, upperBound и lowerBound могут вызвать такое же поведение, если они установлены неправильно. Например, если lowerBound == upperBound, то все данные будут загружены в один раздел, независимо от numPartitions.   -  person Travis Hegner    schedule 02.12.2016
comment
@TravisHegner - Чёрт возьми! Я только что совершил самую большую ошибку "partitionColumn" -> RPT_NUM_PARTITION,, это глупое дело!   -  person Adetiloye Philip Kehinde    schedule 02.12.2016
comment
Потрясающий. Я отвечу.   -  person Travis Hegner    schedule 02.12.2016


Ответы (1)


Ваши значения для partitionColumn, upperBound и lowerBound могут вызвать такое же поведение, если они установлены неправильно. Например, если lowerBound == upperBound, то все данные будут загружены в один раздел, независимо от numPartitions.

Комбинация этих атрибутов определяет, какие (или сколько) записей будут загружены в ваши DataFrame разделы из вашей базы данных SQL.

person Travis Hegner    schedule 02.12.2016