Я видел, что бегун Beam Spark использует BeamSparkRunnerRegistrator
для крио-регистрации. Есть ли способ зарегистрировать пользовательские классы пользователей?
Зарегистрируйте пользовательские классы для сериализации Kryo в Beam Spark runner
Ответы (3)
Есть способ сделать это, но сначала могу я спросить, почему вы хотите это сделать?
Вообще говоря, Spark runner от Beam использует кодировщики Beam для сериализации пользовательских данных.
В настоящее время у нас есть ошибка, из-за которой кэшированные DStream
сериализуются с помощью Kryo, и если пользовательские классы не сериализуемы Kryo, это не удается. BEAM-2669. В настоящее время мы пытаемся решить эту проблему.
Если вы столкнулись с этой проблемой, в настоящее время вы можете обойти ее, используя регистратор Kryo. Это проблема, с которой вы столкнулись? или у вас есть другая причина для этого, пожалуйста, дайте мне знать.
В любом случае, вот как вы можете предоставить свой собственный экземпляр JavaSparkContext
для бегуна Beam Spark с помощью SparkContextOptions
SparkConf conf = new SparkConf();
conf.set("spark.serializer", KryoSerializer.class.getName());
conf.set("spark.kryo.registrator", "my.custom.KryoRegistrator");
JavaSparkContext jsc = new JavaSparkContext(..., conf);
SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);
Pipeline p = Pipeline.create(options);
Для получения дополнительной информации см.:
документация по Beam Spark runner
Пример: ProvidedSparkContextTest.java
Создайте свой собственный KryoRegistrator
с помощью этого пользовательского сериализатора
package Mypackage
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[A], new CustomASerializer())
}}
Затем добавьте запись конфигурации об этом с полным именем вашего регистратора, например. Мой пакет.Мой регистратор:
val conf = new SparkConf()
conf.set("spark.kryo.registrator", "Mypackage.KryoRegistrator")
См. документацию: Spark сериализации данных
Если вы не хотите регистрировать свои классы, сериализация Kryo все равно будет работать, но ей придется хранить полное имя класса с каждым объектом, что расточительно.