Первый метод мне интересно использовать Accumulator для вычисления количества строк "NULL" в разных столбцах, поэтому я пишу код Spark следующим образом (код упрощен), когда я ввожу некоторый ввод в операцию карты appData
, я мог видеть вывод std в искре web ui значение аккумулятора увеличивается, но когда я хочу получить окончательное значение в драйвере, аккумуляторы всегда равны нулю, я буду признателен, если вы окажете мне услугу
val mapAC = collection.mutable.Map[String, LongAccumulator]()
for (ei <- eventList) {
val idNullCN = sc.longAccumulator(ei + "_idNullCN")
mapAC.put(ei + "_idNullCN", idNullCN)
val packNullCN = sc.longAccumulator(ei + "_packNullCN")
mapAC.put(ei + "_packNullCN", packNullCN)
val positionNullCN = sc.longAccumulator(ei + "_positionNullCN")
mapAC.put(ei + "_positionNullCN", positionNullCN)
}
val mapBC = sc.broadcast(mapAC)
val res = appData.map(d => {
val ei = d.eventId
val map = mapBC.value
if (d.id.toUpperCase == "NULL") map(ei + "_idNullCN").add(1)
if (d.pack.toUpperCase == "NULL") map(ei + "_packNullCN").add(1)
if (d.position.toUpperCase == "NULL") map(ei + "_positionNullCN").add(1)
ei
})
res.count()
mapBC.value.foreach(ac=>{
println(ac._1 + ": " + ac._2.value)
})
Второй метод. Я пробовал другой способ вычислить значение, создав такой аккумулятор карты.
import java.util
import java.util.Collections
import org.apache.spark.util.AccumulatorV2
import scala.collection.JavaConversions._
class CountMapAccumulator extends AccumulatorV2[String, java.util.Map[String, Long]] {
private val _map = Collections.synchronizedMap(new util.HashMap[String, Long]())
override def isZero: Boolean = _map.isEmpty
override def copy(): CountMapAccumulator = {
val newAcc = new CountMapAccumulator
_map.synchronized {
newAcc._map.putAll(_map)
}
newAcc
}
override def reset(): Unit = _map.clear()
override def add(key: String): Unit = _map.synchronized{_map.put(key, _map.get(key) + 1L)}
override def merge(other: AccumulatorV2[String, java.util.Map[String, Long]]): Unit = other match {
case o: CountMapAccumulator => for ((k, v) <- o.value) {
val oldValue = _map.put(k, v)
if (oldValue != null) {
_map.put(k, oldValue.longValue() + v)
}
// println("merge key: "+k+" old val: "+oldValue+" new Value: "+v+" current val: "+_map.get(k))
}
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: util.Map[String, Long] = _map.synchronized {
java.util.Collections.unmodifiableMap(new util.HashMap[String, Long](_map))
}
def setValue(value: Map[String, Long]): Unit = {
val newValue = mapAsJavaMap(value)
_map.clear()
_map.putAll(newValue)
}
}
затем я вызываю его следующим образом
val tmpMap = collection.mutable.Map[String, Long]()
for (ei <- eventList) {
tmpMap.put(ei + "_idNullCN", 0L)
tmpMap.put(ei + "_packNullCN", 0L)
tmpMap.put(ei + "_positionNullCN", 0L)
}
val accumulator = new CountMapAccumulator
accumulator.setValue(collection.immutable.Map[String,Long](tmpMap.toSeq:_*))
sc.register(accumulator, "CustomAccumulator")
val res = appData.map(d => {
val ei = d.eventId
if (d.id.toUpperCase == "NULL") accumulator.add(ei + "_idNullCN")
if (d.pack.toUpperCase == "NULL") accumulator.add(ei + "_packNullCN")
if (d.position.toUpperCase == "NULL") accumulator.add(ei + "_positionNullCN")
if (d.modulePos.toUpperCase == "NULL") accumulator.add(ei + "_modulePosNullCN")
ei
})
res.count()
accumulator.value.foreach(println)
но значение аккумулятора по-прежнему равно нулю
второй метод верен, так как программа заканчивается правильно, я не проверял журнал, после того, как я посмотрел, я обнаружил эту ОШИБКУ java.lang.UnsupportedOperationException: Cannot merge $line105198665522.$read$$iw$$iw$CountMapAccumulator with $line105198665522.$read$$iw$$iw$CountMapAccumulator
, поэтому я меняю код сопоставления с образцом метода слияния, подобный этому
override def merge(other: AccumulatorV2[String, java.util.Map[String, Long]]): Unit = other match {
case o: AccumulatorV2[String, java.util.Map[String, Long]] => for ((k, v) <- o.value) {
val oldValue: java.lang.Long = _map.get(k)
if (oldValue != null) {
_map.put(k, oldValue.longValue() + v)
} else {
_map.put(k, v)
}
println(s"key: ${k} oldValue: ${oldValue} newValue: ${v} finalValue: ${_map.get(k)}")
}
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
после изменения типа o он, наконец, работает, но меня все еще смущает, что ведет себя первый способ.
mapAC
- это временный контейнер для каких-то временных результатов, в искре это не работает. - person Boris Azanov   schedule 10.12.2019