Программирование Spark Scala для несериализуемых объектов и функций

У меня есть исключение «Задача не сериализуемая», когда я запускаю программу Spark Scala с

  • Spark RDD не имеет сериализуемого типа (класс Java)
  • вызываемые функции относятся к несериализуемому классу (опять же класс Java)

мой код примерно такой

object Main{
    def main(args : Array(String){
        ...
        var rdd = sc.textFile(filename)
                  .map(line => new NotSerializableJClass(line)).cache() 
        //rdd is RDD[NotSerializableJClass]
        ...
        var test = new NotSerializableJPredicate()
        rdd = rdd.filter(elem => test.test(elem))
        //throws TaskNotSerializable on test Predicate class
    }
}

Я заметил, что могу решить вторую часть с помощью

rdd = rdd.filter(elem => (new NotSerializableJPredicate()).test(elem))

но я все еще получаю это исключение для класса объектов в RDD. И я бы по-другому и вторую часть по-другому просто потому, что я не хочу создавать большое количество объектов PredicateClass.

Можете вы помочь мне? Как я могу продвигаться вперед с несериализуемым классом?


person Andrean    schedule 08.04.2017    source источник
comment
Является ли NotSerializableJClass сторонним классом или классом, определенным в вашем приложении?   -  person himanshuIIITian    schedule 08.04.2017


Ответы (2)


RDD должны быть сериализуемыми, поэтому вы не можете создать RDD несериализуемого класса.

Для вашего предиката вы можете написать его с помощью mapPartitions.

rdd.mapPartitions{
  part => 
    val test = new NotSerializableJPredicate()
    part.filter{elem => test.test(elem)}
   }

mapPartitons будет запускаться один раз для каждого раздела, поэтому он позволяет вам создавать экземпляры несериализуемых классов в исполнителе, но это нужно делать только один раз для раздела, а не для каждой записи.

person puhlen    schedule 10.04.2017

Некоторые из общих правил, которые помогли мне избежать проблем с сериализацией задач:

Если вы вызываете метод любого класса из своего кода, Spark потребуется сериализовать весь класс, содержащий метод. Способы обхода могут быть любыми из следующих: a> Объявите метод как функциональную переменную в NotSerializableClass; поэтому вместо записи: def foo(x:Int)={бла-бла} попробуйте использовать val foo = (x:Int)=>{бла-бла} Итак; spark больше не нужно сериализовать весь класс. b> Рефакторинг кода для извлечения соответствующих частей в отдельный класс может быть правильным решением в некоторых случаях. c>Отметьте объекты в классе, которые на самом деле не нужны для задания, как @transient и отметьте класс Serializable

person sourabh    schedule 10.04.2017