Проблема с подсчетом фреймов данных JDBC в Apache Spark

Я использую Spark JDBC для чтения данных из базы данных MS SQL, но получаю странные результаты.

Например, ниже мой код для чтения записей из моей базы данных MS SQL. Обратите внимание, что таблицы, из которых я читаю данные, постоянно вставляются с записями.

 //Extract Data from JDBC source
    val jdbcTable = sqlContext.read.format("jdbc").options(
      Map(
        "url" -> jdcbUrl,
        "driver" -> "net.sourceforge.jtds.jdbc.Driver",
        "dbtable" ->
          s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
       .load

     println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")

    val updateJdbcDF = jdbcTable
      .withColumn("ID-COL1", trim($"COl1"))
      .withColumn("ID-COL1", trim($"COl2"))

   println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")

Я получаю 2 разных значения счетчика каждый раз, когда запускаю свою программу, например, я всегда получаю ${updateJdbcDF.count()} count> ${jdbcTable.count()}.

Может кто-нибудь объяснить мне, почему это происходит? В моем случае это создает множество проблем. Как ограничить количество jdbcTable DataFrame после его создания. Я пробовал jdbcTable.cache(), но безуспешно.

Записи становятся все больше и больше, когда я использую любую операцию с другим фреймом данных, полученным из jdbcTable DataFrame. Вызывает jdbcTable фрейм данных каждый раз, когда я использую любой фрейм данных, полученный из jdbcTable фрейма данных.


person nilesh1212    schedule 21.09.2017    source источник
comment
Постоянна ли разница? Или вы каждый раз получаете разные счета для обоих утверждений?   -  person philantrovert    schedule 21.09.2017
comment
@philantrovert Никакая разница не постоянная, у меня каждый раз разные подсчеты ..   -  person nilesh1212    schedule 21.09.2017
comment
Что ж, если таблицы, из которых я читаю данные, постоянно вставляются с записями, и ваш запрос не определяет предикат фиксированного диапазона, тогда количество строк в таблице будет разным каждый раз, когда Sparks обращается к нему. Так что то, что вы видите (изменение количества), вполне ожидаемо, не так ли?   -  person GPI    schedule 21.09.2017
comment
@GPI, хорошо, это означает, что если мой предикат отсутствует, то всякий раз, когда я использую какой-либо фрейм данных, полученный из фрейма данных jdbcTable, искра будет снова читать из БД и всегда будет давать мне более высокий счет, верно ?? Собственно, это то, что я наблюдаю из своего кода, jdbcTable - это первый DF, и когда я добавляю несколько столбцов и создаю другой DF, например updateJdbcDF; счетчик увеличивается, поскольку искра перечитала таблицу.   -  person nilesh1212    schedule 21.09.2017
comment
@ nilesh1212 это серая зона. Spark сделает все возможное, чтобы не пересчитывать никакие RDD / фреймы данных, которых он может избежать (и даже позволяет вам кэшировать их). Но он вернется к хранилищам данных (будь то базы данных через JDBC, файлы Parquet в HDFS, ...), если ему придется пересчитать часть своего графа выполнения, которая не была кэширована. Какие виды противоречий с некоторыми базовыми предположениями можно сделать: RDD (и, соответственно, фреймы данных) должны быть неизменными, но если они поддерживаются динамическими данными, они не могут быть такими, и нужно кодировать в соответствии с природой их источник.   -  person GPI    schedule 21.09.2017
comment
@GPI, Спасибо за ответ, мне удалось исправить эту проблему, применив jdbcTable.cache (). Теперь любой DF, полученный из фрейма данных jdbcTable, не дает мне большего количества, чем jdbcTable.count (). Все расчеты сейчас в порядке.   -  person nilesh1212    schedule 21.09.2017
comment
Хорошо, просто будьте осторожны, если вы когда-нибудь столкнетесь с ситуацией, когда ваш фрейм данных не может быть кэширован (недостаточно ОЗУ?), Spark может вернуться к серверу SQL и повторно запросить. Чтобы быть более уверенным, использование upperBound для запрашиваемых элементов (date или id) может помочь. В конце концов, это ваш звонок :-).   -  person GPI    schedule 21.09.2017
comment
@GPI Конечно, я обработаю и упомянутый сценарий, обновлю свой ответ соответственно Спасибо :)   -  person nilesh1212    schedule 21.09.2017


Ответы (1)


Мне удалось исправить эту проблему, применив jdbcTable.cache(). Теперь любой DF, полученный из фрейма данных jdbcTable, не дает мне большего количества, чем jdbcTable.count(). Все расчеты сейчас в порядке. Спасибо за объяснение @GPI

//Extract Data from JDBC source
    val jdbcTable = sqlContext.read.format("jdbc").options(
      Map(
        "url" -> jdcbUrl,
        "driver" -> "net.sourceforge.jtds.jdbc.Driver",
        "dbtable" ->
          s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
       .load

    jdbcTable.cache()

     println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")


    val updateJdbcDF = jdbcTable
      .withColumn("ID-COL1", trim($"COl1"))
      .withColumn("ID-COL1", trim($"COl2"))

   println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")
   /**
     * MORE DATA PROCESSING HERE
   /**

  jdbcTable.unpersist()
person nilesh1212    schedule 21.09.2017