Как я могу передать аккумулятор Spark функции?

Я хочу сделать что-то вроде этого.

val ac = sc.accumulator(0)
....
a = a.map(x => someFunction(x, the_accumulator_object))
....

Что должно быть вместо the_accumulator_ojbect в приведенном выше коде? Было бы нормально написать там ac?

Также в функции

def someFunction(x: TypeOfX, a: TypeOfAccumulator) : ReturnType =
{
    .....
}

Что должно быть на месте TypeOfAccumulator в функции выше?


person pythonic    schedule 03.08.2016    source источник


Ответы (1)


Дополнительную информацию об аккумуляторах Spark можно найти здесь

Согласно scala-docs относительно создания аккумулятора:

/** * Создать переменную [[org.apache.spark.Accumulator]] заданного типа с именем для отображения * в пользовательском интерфейсе Spark. Задачи могут «добавлять» значения в аккумулятор с помощью метода +=. Только водитель * может получить доступ к value аккумулятора. */

Тип аккумулятора по умолчанию — int. Вы можете установить свой собственный тип, но вам нужно правильно реализовать метод += для добавления значений к вашему собственному типу аккумулятора:

val ac = sc.accumulator[MyOwnType](MyOwnTypeObject, "my own type object accumulator")

Ваш основной фрагмент кода будет выглядеть так:

val ac = sc.accumulator(0, "some accumulator")
....
a = a.map(x => someFunction(x, ac))
....
System.out.println("My accumulator value is: " + ac.value)

Где имплантация метода someFunction будет выглядеть так:

def someFunction(x: TypeOfX, ac: Accumulator[Int]) : ReturnType =
{
    ...
    ac += 1
    ...
}
person Avihoo Mamka    schedule 03.08.2016
comment
Что это за второй параметр в объявлении аккумулятора? Это для идентификации разных аккумуляторов? Во-вторых, могу ли я использовать любой тип с аккумулятором. Например, я хочу использовать с ним Set type. - person pythonic; 03.08.2016
comment
Отредактировал ответ с ответами на ваши вопросы. - person Avihoo Mamka; 03.08.2016
comment
Прохладный. Спасибо. Я попробую ваши решения. - person pythonic; 03.08.2016
comment
Я пытаюсь это сделать, но не получается -> val ac = sc.accumulator[Set[Integer]](Set.empty[Integer], set1_acc), говоря, что не удалось найти неявное значение для параметра param: org.apache.spark .AccumulatorParam[scala.collection.multable.Set[Integer]]. Любая идея, какую ошибку я здесь делаю? - person pythonic; 03.08.2016
comment
Попробуйте сделать: val ac = sc.accumulator[Set[Integer]](Set[Integer](), "set1_acc") - person Avihoo Mamka; 03.08.2016