мой аккумулятор представляет собой массив [массив [Int]] после обновления аккумулятора в операции foreach RDD, аккумулятор (0), как и ожидалось, где аккумулятор (1) представляет собой массив (0,0,0), который полностью потерян
внутри RDD значение аккумулятора равно Array(Array(4,5,6),Array(4,5,6)) вне RDD, значение аккумулятора равно Array(Array(4,5,6),Array(0,0,0) ))
ниже код
import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object acc {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val a =Array(Array(1,2,3),Array(4,5,6))
val rdd = sc.parallelize(a)
val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
rdd.foreach{x=>
accumulator += (x(0),0,0)
accumulator += (x(1),0,1)
accumulator += (x(2),0,2)
accumulator += (x(0),1,0)
accumulator += (x(1),1,1)
accumulator += (x(2),1,2)
println("accumulator value in rdd is"+accumulator.localValue)
}
println("accumulator value out of rdd is :" + accumulator.value )
}
}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] {
def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
initialValue
}
def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {
acc(value._2)(value._3) = value._1
acc
}
def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
val columnLength: Int = m1.length
val rowLength: Int = m1(0).length
var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)
var j: Int = 0
while (j < columnLength) {
var i =0
while (i < rowLength) {
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
updatedMatrix
}
}
результаты: внутри RDD значение накопителя равно Array(Array(4,5,6),Array(4,5,6)) вне RDD, значение аккумулятора равно Array(Array(4,5,6),Array(0,0) ,0))
но то, что я ожидаю за пределами RDD, это массив (массив (4,5,6), массив (4,5,6))