значение искрового аккумулятора отличается внутри RDD и снаружи RDD

мой аккумулятор представляет собой массив [массив [Int]] после обновления аккумулятора в операции foreach RDD, аккумулятор (0), как и ожидалось, где аккумулятор (1) представляет собой массив (0,0,0), который полностью потерян

внутри RDD значение аккумулятора равно Array(Array(4,5,6),Array(4,5,6)) вне RDD, значение аккумулятора равно Array(Array(4,5,6),Array(0,0,0) ))

ниже код

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     val conf = new SparkConf().setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     println("accumulator value in rdd is"+accumulator.localValue)
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc

  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    while (j < columnLength) {
      var i =0
    while (i < rowLength) {
         val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
      }


}

результаты: внутри RDD значение накопителя равно Array(Array(4,5,6),Array(4,5,6)) вне RDD, значение аккумулятора равно Array(Array(4,5,6),Array(0,0) ,0))

но то, что я ожидаю за пределами RDD, это массив (массив (4,5,6), массив (4,5,6))


person nagSumanth    schedule 08.12.2014    source источник
comment
Эта проблема решена?   -  person Vijay Innamuri    schedule 10.12.2014


Ответы (3)


Метод addAccumulator вызывается всякий раз, когда происходит обновление переменной accumulator.variable.

В приведенном выше коде аккумулятор += (x(0),0,0) вызывает метод addAccumulator.

после завершения всех задач вызывается метод addInPlace для объединения накопленных значений всех задач.

В приведенном выше коде initialValue Array(1, 1, 1)Array(1, 1, 1) и значение Task Accumulator Array(4, 5, 6) Array(4, 5, 6) вызывает метод addInPlace.

В приведенном выше коде переменная i в методе addInPlace должна сбрасываться всякий раз, когда она входит в цикл while (j ‹ columnLength) {

Следующий код работает как шарм.

            while (j < columnLength) {
              i=0
                while (i < rowLength) {
                  println("m1(j)(i)"+ m1(j)(i))
                  println(" m2(j)(i))"+ m2(j)(i))
                    val a = Math.max(m1(j)(i), m2(j)(i))
                            updatedMatrix(j)(i) = a
                            i += 1
                } 
                j += 1
            }
person Vijay Innamuri    schedule 09.12.2014
comment
Я обнаружил, что результат тот же - person Tim; 10.11.2015

localValue должен быть другим, согласно документу:

  • Это НЕ глобальное значение аккумулятора. Чтобы получить глобальное значение после
  • завершил операцию над набором данных, вызовите value. *
  • Типичное использование этого метода заключается в непосредственном изменении локального значения, например, для добавления
  • элемент в Set. */
person tribbloid    schedule 19.12.2014

Я не обнаружил разницы с изменением var i=0 на i=0, а конечным результатом является Array(Array(4,5,6),Array(4,5,6))

Вывод приложения извлекается из журналов пряжи -applicationId.

Код:

import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object acc {
  def main(args: Array[String]) {
     //val conf = new SparkConf().setAppName("Simple Application")
  val conf = new SparkConf()
  conf.setSparkHome("/usr/lib/spark")
  conf.setAppName("Simple Application")
  val sc = new SparkContext(conf)
  val a =Array(Array(1,2,3),Array(4,5,6))
  val rdd = sc.parallelize(a)
  val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
  val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
  rdd.foreach{x=>
     accumulator += (x(0),0,0)
     accumulator += (x(1),0,1)
     accumulator += (x(2),0,2)
     accumulator += (x(0),1,0)
     accumulator += (x(1),1,1)
     accumulator += (x(2),1,2)
     val columnLength: Int = accumulator.localValue.length
     val rowLength: Int = accumulator.localValue(0).length
     var j: Int = 0
     var i: Int = 0
     println("accumulator")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.localValue(j)(i))
            i += 1
        }
        j+=1
     }
     println("accumulator value in rdd is"+accumulator.localValue)
     }
     val columnLength: Int = accumulator.value.length
     val rowLength: Int = accumulator.value(0).length
     var j: Int = 0
     var i: Int = 0
     println("total")
     while(j < columnLength){
        i =0 
        while(i<rowLength){
            println(accumulator.value(j)(i))
            i += 1
        }
        j+=1
     }

  println("accumulator value out of rdd is :" + accumulator.value )

  }

}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int,   Int)] {

  def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
    initialValue
  }

  def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {

    acc(value._2)(value._3) = value._1
    acc
  }

   def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
    val columnLength: Int = m1.length
    val rowLength: Int = m1(0).length
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)

    var j: Int = 0
    var i: Int = 0
    while (j < columnLength) {
    i =0
    while (i < rowLength) {
        println("m1("+j+")("+i+")="+ m1(j)(i) + " m2("+j+")("+i+")="+ m2(j)(i))
        val a = Math.max(m1(j)(i), m2(j)(i))
        updatedMatrix(j)(i) = a
        i += 1
      } 
      j += 1
    }

    updatedMatrix
  }
}

И результат:

accumulator
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6

total
4
5
6
4
5
6

И измените код на это:

    //var i: Int = 0
    while (j < columnLength) {
    var i =0

И результат:

m1(0)(0)=1 m2(0)(0)=1
m1(0)(1)=1 m2(0)(1)=2
m1(0)(2)=1 m2(0)(2)=3
m1(1)(0)=1 m2(1)(0)=1
m1(1)(1)=1 m2(1)(1)=2
m1(1)(2)=1 m2(1)(2)=3
m1(0)(0)=1 m2(0)(0)=4
m1(0)(1)=2 m2(0)(1)=5
m1(0)(2)=3 m2(0)(2)=6
m1(1)(0)=1 m2(1)(0)=4
m1(1)(1)=2 m2(1)(1)=5
m1(1)(2)=3 m2(1)(2)=6
total
4
5
6
4
5
6

accumulator
1
2
3
1
2
3

accumulator
4
5
6
4
5
6

Конечный результат тот же.

Но у меня есть два вопроса:

  • Я не знаю, почему два порядка вывода не совпадают.
  • Why the addInplace function be called twice?
    • I think I know why this function be called twice, but I'm not sure
      • initialize: Array(Array(1,1,1),Array(1,1,1)
      • вывод задачи: Массив(Массив(1,2,3),Массив(1,2,3)
      • вывод другой задачи: Массив(Массив(4,5,6),Массив(4,5,6)

@Виджай Иннамури

person Tim    schedule 10.11.2015