Лучший способ сканирования данных с использованием scala и spark

Проблема

Входные данные имеют 2 типа записей, назовем их R и W.

Мне нужно пройти эти данные в последовательности сверху вниз таким образом, чтобы, если текущая запись имеет тип W, она должна быть объединена с картой (давайте назовем ее workMap). Если ключ этой записи W-типа уже присутствует в карте, то к нему добавляется значение этой записи, иначе в workMap делается новая запись.

Если текущая запись имеет тип R, workMap, рассчитанный до этой записи, присоединяется к текущей записи.

Например, если это порядок записей -

W1-   a -> 2
W2-   b -> 3
W3-   a -> 4
R1 
W4-   c -> 1
R2
W5-   c -> 4

Где W1, W2, W3, W4 и W5 имеют тип W; А R1 и R2 имеют тип R

В конце этой функции у меня должно быть следующее:

R1 - { a -> 6, 
       b -> 3 } //merged(W1, W2, W3)
R2 - { a -> 6, 
       b -> 3,
       c -> 1 } //merged(W1, W2, W3, W4)
{ a -> 6, 
  b -> 3,
  c -> 5 } //merged(W1, W2, W3, W4, W5)

Я хочу, чтобы все записи R-типа, прикрепленные к промежуточным workMap, были рассчитаны до этого момента; И окончательный workMap после обработки последней записи.

Вот код, который я написал -

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
  Iterator[(ReportKey, ReportVal)] = {

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
    val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)]

    while (itr.hasNext) {
      val temp = itr.next()
      val (iKey, iVal) = (temp._1, temp._2)

      if (iKey.recordType == reportType) {
       //creates a new (ReportKey, Reportval)
        reportList += getNewReportRecord(workMap, iKey, iVal) 
      }
      else {
        //if iKey is already present, merge the values 
        //other wise adds a new entry
        updateWorkMap(workMap, iKey, iVal) 
      }
    }
    val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport)

    reportList.iterator ++ workList.iterator
  }

ReportKey класс такой -

case class ReportKey (
                        // the type of record - report or work 
                        rType: Int, 
                        date: String, 
                      .....
                       )

У этого подхода есть две проблемы, о которых я прошу помощи:

  1. Я должен отслеживать reportList - список записей типа R, прикрепленных к промежуточным workMap. По мере роста данных reportList также растет, и я натыкаюсь на OutOfMemoryExceptions.
  2. Я должен объединить записи reportList и workMap в одной структуре данных, а затем вернуть их. Если есть какой-то другой элегантный способ, я бы определенно подумал об изменении этого дизайна.

Для полноты картины я использую искру. Функция calcPerPartition передается в качестве аргумента для mapPartitions в RDD. Мне нужно workMaps из каждого раздела, чтобы позже выполнить некоторые дополнительные вычисления.

Я знаю, что если мне не нужно возвращать workMaps из каждого раздела, проблема становится намного проще, например:

...
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]                     
itr.scanLeft[Option[(ReportKey, Reportval)]](
  None)((acc: Option[(ReportKey, Reportval)], 
  curr: (InputKey, InputVal)) => {

  if (curr._1.recordType == reportType) {
    val rec = getNewReportRecord(workMap, curr._1, curr._2)
    Some(rec)
  }
  else {
    updateWorkMap(workMap, curr._1, curr._2)
    None
  }
})

val reportList = scan.filter(_.isDefined).map(_.get)
//workMap is still empty after the scanLeft. 
... 

Конечно, я могу выполнить операцию reduce над входными данными, чтобы получить окончательное workMap, но мне нужно будет просмотреть данные дважды. Учитывая, что набор входных данных огромен, я тоже хочу этого избежать.

Но, к сожалению, мне нужны workMaps на последнем этапе.

Итак, есть ли лучший способ решить вышеуказанную проблему? Если я вообще не могу решить проблему 2 (в соответствии с этим), есть ли другой способ избежать сохранения R записей (reportList) в списке или сканирования данных более одного раза?


person Phani Rahul    schedule 25.03.2017    source источник


Ответы (1)


У меня еще нет лучшего дизайна для второго вопроса - если вы можете избежать объединения reportList и workMap в единую структуру данных, но мы, безусловно, можем избежать хранения записей типа R в списке.

Вот как мы можем переписать calcPerPartition из приведенного выше вопроса:

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
  Iterator[Option[(ReportKey, ReportVal)]] = {

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
    var finalWorkMap = true

    new Iterator[Option[(ReportKey, ReportVal)]](){
        override def hasNext: Boolean = itr.hasNext

        override def next(): Option[(ReportKey, ReportVal)] = {
            val curr = itr.next()
            val iKey = curr._1
            val iVal = curr._2
            val eventKey = EventKey(openKey.date, openKey.symbol)

            if (iKey.recordType == reportType) {
              Some(getNewReportRecord(workMap, iKey, iVal))
            }
            else {
              //otherwise update the generic interest map but don't accumulate anything
              updateWorkMap(workMap, iKey, iVal)
              if (itr.hasNext) {
                next()
              }
              else {
                  if(finalWorkMap){
                    finalWorkMap = false //because we want a final only once
                    Some(workMap.map(convertToReport))
                  }
                  else {
                    None
                  }

              }
            }
        }
    }
  }

Вместо сохранения результатов в списке мы определили итератор. Это решило большинство проблем с памятью, которые у нас были по этой проблеме.

person Phani Rahul    schedule 06.09.2017