Spark Scala возвращает данные из rdd.foreachPartition

У меня есть такой код:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)

Код выводит (с большими правками) что-то вроде:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void

ВОПРОС 1: Как я могу получить значение lastRevs в полезном формате, таком как строка JSON/null или опция, такая как Some / None?

ВОПРОС 2: Мое предпочтение: есть ли другой способ получить данные разделов в формате, подобном RDD (а не в формате итератора)?

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  } 
}

из http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

ВОПРОС 3: Является ли метод получения данных, который я использую, разумным методом (учитывая, что я перехожу по ссылке выше)? (Отбросим тот факт, что прямо сейчас это масштабируемая система JDBC jdbc. Это будет хранилище ключей и значений какого-то типа, отличного от этого прототипа.)


person codeaperature    schedule 30.04.2016    source источник
comment
Я не понимаю вопроса. lastRevs должно быть Unit, потому что .forEachPartition используется только для его побочного эффекта (функция T=›Unit). Я думаю, вы хотите преобразовать данные, например, используя вместо этого mapPartitions. Я хотел бы понять, какова здесь общая цель, потому что отдельные вопросы не имеют особого смысла (для меня)   -  person maasg    schedule 30.04.2016
comment
@maasg: Да. Это ответ, который я ищу - mapPartitions. Я нашел еще один пример по адресу stackoverflow.com/questions/21698443/.   -  person codeaperature    schedule 30.04.2016


Ответы (1)


Чтобы создать преобразование, использующее ресурсы, локальные для исполнителя (например, базу данных или сетевое соединение), следует использовать rdd.mapPartitions. Это позволяет инициализировать некоторый код локально для исполнителя и использовать эти локальные ресурсы для обработки данных в разделе.

Код должен выглядеть так:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }
person maasg    schedule 30.04.2016
comment
вы имеете в виду, что он отличается от foreachPartition тем, что использует ресурсы Исполнителя вместо ресурсов Драйвера? Т.е. код foreachPartition выполняется на Драйвере, тогда как mapPartitions на Исполнителе... правильно? - person lisak; 26.05.2016
comment
@lisak Нет, и foreachPartition, и mapPartitions позволят вам запускать код на исполнителях. Разница в том, что foreachPartition имеет только побочные эффекты (например, запись в БД), а mapPartitions возвращает значение. Ключом к этому вопросу является «как вернуть данные», поэтому mapPartitions - это путь. - person maasg; 26.05.2016
comment
@maasg У меня есть такой код ' val company_model_vals_df = riched_company_model_vals_df.repartition(col(model_id),col(fiscal_quarter),col(fiscal_year)) company_model_vals_df.foreachPartition(writeAsParquet(dataframe) /// будет писать как паркет в hdfs // // Но как здесь использовать foreachPartition? ) - person BdEngineer; 16.01.2019