Проблема
Входные данные имеют 2 типа записей, назовем их R
и W
.
Мне нужно пройти эти данные в последовательности сверху вниз таким образом, чтобы, если текущая запись имеет тип W
, она должна быть объединена с картой (давайте назовем ее workMap
). Если ключ этой записи W-типа уже присутствует в карте, то к нему добавляется значение этой записи, иначе в workMap
делается новая запись.
Если текущая запись имеет тип R
, workMap
, рассчитанный до этой записи, присоединяется к текущей записи.
Например, если это порядок записей -
W1- a -> 2
W2- b -> 3
W3- a -> 4
R1
W4- c -> 1
R2
W5- c -> 4
Где W1, W2, W3, W4 и W5 имеют тип W
; А R1 и R2 имеют тип R
В конце этой функции у меня должно быть следующее:
R1 - { a -> 6,
b -> 3 } //merged(W1, W2, W3)
R2 - { a -> 6,
b -> 3,
c -> 1 } //merged(W1, W2, W3, W4)
{ a -> 6,
b -> 3,
c -> 5 } //merged(W1, W2, W3, W4, W5)
Я хочу, чтобы все записи R-типа, прикрепленные к промежуточным workMap
, были рассчитаны до этого момента; И окончательный workMap
после обработки последней записи.
Вот код, который я написал -
def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
Iterator[(ReportKey, ReportVal)] = {
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)]
while (itr.hasNext) {
val temp = itr.next()
val (iKey, iVal) = (temp._1, temp._2)
if (iKey.recordType == reportType) {
//creates a new (ReportKey, Reportval)
reportList += getNewReportRecord(workMap, iKey, iVal)
}
else {
//if iKey is already present, merge the values
//other wise adds a new entry
updateWorkMap(workMap, iKey, iVal)
}
}
val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport)
reportList.iterator ++ workList.iterator
}
ReportKey
класс такой -
case class ReportKey (
// the type of record - report or work
rType: Int,
date: String,
.....
)
У этого подхода есть две проблемы, о которых я прошу помощи:
- Я должен отслеживать
reportList
- список записей типаR
, прикрепленных к промежуточнымworkMap
. По мере роста данныхreportList
также растет, и я натыкаюсь наOutOfMemoryException
s. - Я должен объединить записи
reportList
иworkMap
в одной структуре данных, а затем вернуть их. Если есть какой-то другой элегантный способ, я бы определенно подумал об изменении этого дизайна.
Для полноты картины я использую искру. Функция calcPerPartition
передается в качестве аргумента для mapPartitions в RDD. Мне нужно workMap
s из каждого раздела, чтобы позже выполнить некоторые дополнительные вычисления.
Я знаю, что если мне не нужно возвращать workMap
s из каждого раздела, проблема становится намного проще, например:
...
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
itr.scanLeft[Option[(ReportKey, Reportval)]](
None)((acc: Option[(ReportKey, Reportval)],
curr: (InputKey, InputVal)) => {
if (curr._1.recordType == reportType) {
val rec = getNewReportRecord(workMap, curr._1, curr._2)
Some(rec)
}
else {
updateWorkMap(workMap, curr._1, curr._2)
None
}
})
val reportList = scan.filter(_.isDefined).map(_.get)
//workMap is still empty after the scanLeft.
...
Конечно, я могу выполнить операцию reduce
над входными данными, чтобы получить окончательное workMap
, но мне нужно будет просмотреть данные дважды. Учитывая, что набор входных данных огромен, я тоже хочу этого избежать.
Но, к сожалению, мне нужны workMap
s на последнем этапе.
Итак, есть ли лучший способ решить вышеуказанную проблему? Если я вообще не могу решить проблему 2 (в соответствии с этим), есть ли другой способ избежать сохранения R
записей (reportList
) в списке или сканирования данных более одного раза?