Почему я не мог правильно получить аккумулятор из экземпляра карты?

Первый метод мне интересно использовать Accumulator для вычисления количества строк "NULL" в разных столбцах, поэтому я пишу код Spark следующим образом (код упрощен), когда я ввожу некоторый ввод в операцию карты appData, я мог видеть вывод std в искре web ui значение аккумулятора увеличивается, но когда я хочу получить окончательное значение в драйвере, аккумуляторы всегда равны нулю, я буду признателен, если вы окажете мне услугу

val mapAC = collection.mutable.Map[String, LongAccumulator]()

for (ei <- eventList) { 
  val idNullCN = sc.longAccumulator(ei + "_idNullCN")
  mapAC.put(ei + "_idNullCN", idNullCN)
  val packNullCN = sc.longAccumulator(ei + "_packNullCN")
  mapAC.put(ei + "_packNullCN", packNullCN)
  val positionNullCN = sc.longAccumulator(ei + "_positionNullCN")
  mapAC.put(ei + "_positionNullCN", positionNullCN)
}

val mapBC = sc.broadcast(mapAC)

val res = appData.map(d => {
  val ei = d.eventId
  val map = mapBC.value
  if (d.id.toUpperCase == "NULL") map(ei + "_idNullCN").add(1)
  if (d.pack.toUpperCase == "NULL") map(ei + "_packNullCN").add(1)
  if (d.position.toUpperCase == "NULL") map(ei + "_positionNullCN").add(1)
  ei
})

res.count()

mapBC.value.foreach(ac=>{
  println(ac._1 + ": " + ac._2.value)
})

Второй метод. Я пробовал другой способ вычислить значение, создав такой аккумулятор карты.

import java.util
import java.util.Collections
import org.apache.spark.util.AccumulatorV2
import scala.collection.JavaConversions._

class CountMapAccumulator extends AccumulatorV2[String, java.util.Map[String, Long]] {
  private val _map = Collections.synchronizedMap(new util.HashMap[String, Long]())

  override def isZero: Boolean = _map.isEmpty

  override def copy(): CountMapAccumulator = {
    val newAcc = new CountMapAccumulator
    _map.synchronized {
      newAcc._map.putAll(_map)
    }
    newAcc
  }

  override def reset(): Unit = _map.clear()

  override def add(key: String): Unit =  _map.synchronized{_map.put(key, _map.get(key) + 1L)}

  override def merge(other: AccumulatorV2[String, java.util.Map[String, Long]]): Unit = other match {
    case o: CountMapAccumulator => for ((k, v) <- o.value) {
      val oldValue = _map.put(k, v)
      if (oldValue != null) {
        _map.put(k, oldValue.longValue() + v)
      }
    //   println("merge key: "+k+" old val: "+oldValue+" new Value: "+v+" current val: "+_map.get(k))
    }
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: util.Map[String, Long] = _map.synchronized {
    java.util.Collections.unmodifiableMap(new util.HashMap[String, Long](_map))
  }

  def setValue(value: Map[String, Long]): Unit = {
    val newValue = mapAsJavaMap(value)
    _map.clear()
    _map.putAll(newValue)
  }
}

затем я вызываю его следующим образом

val tmpMap = collection.mutable.Map[String, Long]()

for (ei <- eventList) {
  tmpMap.put(ei + "_idNullCN", 0L)
  tmpMap.put(ei + "_packNullCN", 0L)
  tmpMap.put(ei + "_positionNullCN", 0L)
}
val accumulator = new CountMapAccumulator
accumulator.setValue(collection.immutable.Map[String,Long](tmpMap.toSeq:_*))
sc.register(accumulator, "CustomAccumulator")

val res = appData.map(d => {
  val ei = d.eventId
  if (d.id.toUpperCase == "NULL") accumulator.add(ei + "_idNullCN")
  if (d.pack.toUpperCase == "NULL") accumulator.add(ei + "_packNullCN")
  if (d.position.toUpperCase == "NULL") accumulator.add(ei + "_positionNullCN")
  if (d.modulePos.toUpperCase == "NULL") accumulator.add(ei + "_modulePosNullCN")
  ei
})

res.count()
accumulator.value.foreach(println)

но значение аккумулятора по-прежнему равно нулю


второй метод верен, так как программа заканчивается правильно, я не проверял журнал, после того, как я посмотрел, я обнаружил эту ОШИБКУ java.lang.UnsupportedOperationException: Cannot merge $line105198665522.$read$$iw$$iw$CountMapAccumulator with $line105198665522.$read$$iw$$iw$CountMapAccumulator, поэтому я меняю код сопоставления с образцом метода слияния, подобный этому

override def merge(other: AccumulatorV2[String, java.util.Map[String, Long]]): Unit = other match {
    case o: AccumulatorV2[String, java.util.Map[String, Long]] => for ((k, v) <- o.value) {
        val oldValue: java.lang.Long = _map.get(k)
        if (oldValue != null) {
        _map.put(k, oldValue.longValue() + v)
        } else {
        _map.put(k, v)
        }
        println(s"key: ${k} oldValue: ${oldValue} newValue: ${v} finalValue: ${_map.get(k)}")
    }
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

после изменения типа o он, наконец, работает, но меня все еще смущает, что ведет себя первый способ.


person Js_zero    schedule 10.12.2019    source источник
comment
Вы не можете получить измененные значения транслируемого контейнера после выполнения. Вещание с использованием для передачи некоторых полезных данных исполнителям для оценки результата, который вернется. Вы можете просто что-то вычислить и явно вернуть из распределенной оценки. Не как побочный эффект, просто результат чистой функции. Узнайте больше о парадигме Spark и Map Reduce.   -  person Boris Azanov    schedule 10.12.2019
comment
Я предполагаю, что вы думаете, что mapAC - это временный контейнер для каких-то временных результатов, в искре это не работает.   -  person Boris Azanov    schedule 10.12.2019
comment
@Boris Azanov я не менял карту, я просто использую карту, чтобы получить аккумулятор по имени с карты. Я думаю, что аккумуляторы должны правильно рассчитывать значение, не так ли? что бы то ни было, когда я перехожу на второй способ, кажется, что все идет не так   -  person Js_zero    schedule 10.12.2019
comment
Я попробовал протестировать ваш первый подход локально, и это были правильные значения.   -  person Boris Azanov    schedule 11.12.2019
comment
@ Борис Азанов, я думаю, есть большая разница между локальным режимом и кластерным режимом.   -  person Js_zero    schedule 18.12.2019


Ответы (1)


в вашем пользовательском аккумуляторе у вас есть ошибка в функции слияния, посмотрите правильно:

val oldValue: java.lang.Long = _map.get(k)
if (oldValue != null) {
  _map.put(k, oldValue.longValue() + v)
} else {
  _map.put(k, v)
}
person Boris Azanov    schedule 10.12.2019
comment
я нашел ошибку и наконец исправил ее! вопросы обновлены, спасибо за помощь - person Js_zero; 11.12.2019