искра aparch, исключение NotSerializableException: org.apache.hadoop.io.Text

вот мой код:

  val bg = imageBundleRDD.first()    //bg:[Text, BundleWritable]
  val res= imageBundleRDD.map(data => {
                                val desBundle = colorToGray(bg._2)        //lineA:NotSerializableException: org.apache.hadoop.io.Text
                                //val desBundle = colorToGray(data._2)    //lineB:everything is ok
                                (data._1, desBundle)
                             })
  println(res.count)

lineB проходит нормально, но lineA показывает, что:org.apache.spark.SparkException: Задание прервано: Задача не сериализуема: java.io.NotSerializableException: org.apache.hadoop.io.Text

Я пытаюсь использовать Kryo для решения своей проблемы, но, похоже, ничего не изменилось:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
       kryo.register(classOf[Text])
       kryo.register(classOf[BundleWritable])
  }
}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

Спасибо!!!


person Hellen    schedule 12.01.2014    source источник
comment
Дубликат см. stackoverflow.com/a/22594142/1586965   -  person samthebest    schedule 02.07.2014
comment
Используйте этот ответ stackoverflow.com/a/25270600/1586965   -  person samthebest    schedule 11.12.2014


Ответы (3)


У меня была аналогичная проблема, когда мой код Java читал файлы последовательности, содержащие текстовые ключи. Я нашел этот пост полезным:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-solve-java-io-NotSerializableException-org-apache-hadoop-io-Text-td2650.html

В моем случае я преобразовал текст в строку, используя карту:

JavaPairRDD<String, VideoRecording> mapped = videos.map(new PairFunction<Tuple2<Text,VideoRecording>,String,VideoRecording>() {
    @Override
    public Tuple2<String, VideoRecording> call(
            Tuple2<Text, VideoRecording> kv) throws Exception {
        // Necessary to copy value as Hadoop chooses to reuse objects
        VideoRecording vr = new VideoRecording(kv._2);
        return new Tuple2(kv._1.toString(), vr);
    }
});

Помните об этом примечании в API для метода sequenceFile в JavaSparkContext:

Примечание. Поскольку класс RecordReader в Hadoop повторно использует один и тот же объект Writable для каждой записи, непосредственное кэширование возвращенного RDD приведет к созданию множества ссылок на один и тот же объект. Если вы планируете напрямую кэшировать записываемые объекты Hadoop, вам следует сначала скопировать их с помощью функции отображения.

person Madhu    schedule 05.04.2014

В Apache Spark при работе с файлами последовательности мы должны следовать следующим методам:

 -- Use Java equivalent Data Types in place of Hadoop data types.
 -- Spark Automatically converts the Writables into Java equivalent Types.

Ex:- We have a sequence file "xyz", here key type is say Text and value
is LongWritable. When we use this file to create an RDD, we need use  their 
java  equivalent data types i.e., String and Long respectively.

 val mydata = = sc.sequenceFile[String, Long]("path/to/xyz")
 mydata.collect

person Naga    schedule 16.12.2016

Причина, по которой ваш код имеет проблему с сериализацией, заключается в том, что ваша настройка Kryo, хотя и близка, не совсем верна:

сдача:

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")
val sc = new SparkContext(...

to:

val sparkConf = new SparkConf()
  // ... set master, appname, etc, then:
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator")

val sc = new SparkContext(sparkConf)
person Community    schedule 31.08.2015