Зарегистрируйте пользовательские классы для сериализации Kryo в Beam Spark runner

Я видел, что бегун Beam Spark использует BeamSparkRunnerRegistrator для крио-регистрации. Есть ли способ зарегистрировать пользовательские классы пользователей?


person azagrebin    schedule 16.08.2017    source источник


Ответы (3)


Есть способ сделать это, но сначала могу я спросить, почему вы хотите это сделать?

Вообще говоря, Spark runner от Beam использует кодировщики Beam для сериализации пользовательских данных.

В настоящее время у нас есть ошибка, из-за которой кэшированные DStream сериализуются с помощью Kryo, и если пользовательские классы не сериализуемы Kryo, это не удается. BEAM-2669. В настоящее время мы пытаемся решить эту проблему.

Если вы столкнулись с этой проблемой, в настоящее время вы можете обойти ее, используя регистратор Kryo. Это проблема, с которой вы столкнулись? или у вас есть другая причина для этого, пожалуйста, дайте мне знать.

В любом случае, вот как вы можете предоставить свой собственный экземпляр JavaSparkContext для бегуна Beam Spark с помощью SparkContextOptions

SparkConf conf = new SparkConf();
conf.set("spark.serializer", KryoSerializer.class.getName());
conf.set("spark.kryo.registrator", "my.custom.KryoRegistrator");

JavaSparkContext jsc = new JavaSparkContext(..., conf);

SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);

Pipeline p = Pipeline.create(options);

Для получения дополнительной информации см.:

документация по Beam Spark runner

Пример: ProvidedSparkContextTest.java

person aviemzur    schedule 17.08.2017
comment
Проблема была с потреблением памяти. У меня было несколько простых классов POJO с парой примитивных полей Java. Когда я начал использовать их очень простую сериализацию adhoc String для передачи объекта между преобразованиями, использование памяти резко сократилось. Крио был активирован в искровом режиме. Вот почему я подумал, что String, вероятно, очень эффективно сериализован, а мои простые классы — нет, потому что они не были зарегистрированы. Означает ли это, что spark runner сериализует пользовательские объекты с помощью beam в свои внутренние структуры данных, а затем они сериализуются kryo в spark? - person azagrebin; 19.09.2017
comment
Для всех классов пользовательских данных мы сериализуем их в байты с помощью кодеров Beam, оттуда Spark использует Kryo для сериализации данных, однако, поскольку данные уже находятся в байтовой форме, Kryo не сериализует их, а передает байты как есть. - person aviemzur; 08.10.2017

Создайте свой собственный KryoRegistrator с помощью этого пользовательского сериализатора

package Mypackage
class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[A], new CustomASerializer())
}}

Затем добавьте запись конфигурации об этом с полным именем вашего регистратора, например. Мой пакет.Мой регистратор:

val conf = new SparkConf()
conf.set("spark.kryo.registrator", "Mypackage.KryoRegistrator")

См. документацию: Spark сериализации данных

person Indrajit Swain    schedule 16.08.2017
comment
ОП спрашивает о бегуне Beam Spark, а не о самой Spark. - person aviemzur; 17.08.2017

Если вы не хотите регистрировать свои классы, сериализация Kryo все равно будет работать, но ей придется хранить полное имя класса с каждым объектом, что расточительно.

person Zahid Maqbool    schedule 19.09.2019