У меня есть такой код:
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
val lastRevs = distinctFileGidsRDD.
foreachPartition(iter => {
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
while(iter.hasNext) {
val item = iter.next()
//println(item(0))
println("String: "+item(0).toString())
val jsonStr = DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
map { resultSet => resultSet.string(1) }.single.apply()
}
println("\nJSON: "+jsonStr)
}
})
println("\nEND Last Revs Class: "+ lastRevs.getClass)
Код выводит (с большими правками) что-то вроде:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void
ВОПРОС 1: Как я могу получить значение lastRevs в полезном формате, таком как строка JSON/null или опция, такая как Some / None?
ВОПРОС 2: Мое предпочтение: есть ли другой способ получить данные разделов в формате, подобном RDD (а не в формате итератора)?
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
из http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
ВОПРОС 3: Является ли метод получения данных, который я использую, разумным методом (учитывая, что я перехожу по ссылке выше)? (Отбросим тот факт, что прямо сейчас это масштабируемая система JDBC jdbc. Это будет хранилище ключей и значений какого-то типа, отличного от этого прототипа.)
lastRevs
должно бытьUnit
, потому что.forEachPartition
используется только для его побочного эффекта (функция T=›Unit). Я думаю, вы хотите преобразовать данные, например, используя вместо этогоmapPartitions
. Я хотел бы понять, какова здесь общая цель, потому что отдельные вопросы не имеют особого смысла (для меня) - person maasg   schedule 30.04.2016