Как сериализовать класс, созданный во время выполнения, в Apache Beam

У меня есть приложение apache-beam, которое запускает конвейер как локально с прямым бегуном, так и в облаке Google с помощью бегуна потока данных. Он работает локально, но не работает с обработчиком потока данных Google.

Вот следы ошибок:

Это указывает на

"... невозможно десериализовать сериализованный DoFnInfo"

а также

"... java.lang.ClassNotFoundException: Header_H"

Я подозреваю, что это как-то связано с тем, что я использовал код bytebuddy для создания класса Header_H. Я использовал bytebuddy для создания подкласса на основе some.class в существующем исходном коде и дополнительных вводимых пользователем данных из файла конфигурации во время выполнения, т.е. Header_H становится доступным только во время выполнения.

мой код bytebuddy примерно такой:

Затем clazz (в данном случае Header_H) будет передан конвейеру в потоке данных. Когда я проверил содержимое jar-файла во временном местоположении облачной сцены Google, я вижу some.class, но не Header_H.class, и это, вероятно, вызывает ошибку «ClassNotFoundException».

builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC);
       .defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L)
       .implement(Serializable.class);

Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded();

Итак, если мои рассуждения верны, то как я могу заставить Beam помещать класс, созданный во время выполнения, в файл jar для отправки в средство выполнения потока данных, учитывая, что у меня есть implement(Serializable.class) в создании моего класса?

Byte Buddy может внедрить класс в файл jar через:


person bignano    schedule 16.09.2017    source источник


Ответы (2)


Это изменит существующий файл jar, чтобы включить динамически сгенерированный класс. Таким образом, вы можете изменить существующую банку, которая уже находится на пути к системному классу.

DynamicType.Unloaded<?> type = builder.make();
builder.inject(someJar);

Этот API также позволит вам создать новый jar-файл, и вы можете использовать Instrumentation API (через Java-агент), который позволяет вам добавить этот класс в качестве нового файла jar в путь к классам. Чтобы избежать подключения агента, вы также можете попробовать использовать проект byte-buddy-agent для динамического вложения.

Это будет работать:

Если динамическое вложение не разрешено в Google Cloud, вы можете решить эту проблему с помощью обычного вложения в командной строке.

File someFolder = ...
File jar = builder.saveIn(someFolder);
ByteBuddyAgent.install().appendToSystemClassLoaderSearch(new JarFile(jar));

Средство выполнения потока данных не контролирует содержимое ваших JAR-файлов - он только анализирует путь к классам вашей программы, считывает JAR-файлы с диска и копирует их в промежуточный каталог вашего конвейера на GCS. Прямо сейчас Beam не предоставляет способ доставки классов, которые не содержатся в JAR в вашем пути к классам.

person Rafael Winterhalter    schedule 18.09.2017
comment
Спасибо @jkff. давайте посмотрим, может ли кто-нибудь помочь прояснить ситуацию со стороны Bytebuddy. - person bignano; 18.09.2017

Вероятно, вам нужно будет найти способ использовать только классы из этих JAR в спецификации вашего конвейера, однако вы, конечно, все равно можете использовать ByteBuddy в своем DoFn или другом коде, который выполняется на рабочих локально. Но обратите внимание, что все, что будет отправлено между рабочими (например, содержимое PCollection), по-прежнему должно быть сериализуемым (сериализуемым для одного рабочего и десериализуемым на другом) или иметь Coder.

В качестве альтернативы может быть способ заставить ByteBuddy создавать JAR и динамически добавлять его в путь к классам вашей программы. Это может сработать, но это вопрос, относящийся к ByteBuddy, и я недостаточно знаком с ByteBuddy, чтобы сказать, как это сделать.

Вы имеете в виду _1_ и _2_ в приведенном выше коде?

person jkff    schedule 16.09.2017
comment
(9938ce94c0752c7): java.lang.RuntimeException: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: невозможно десериализовать сериализованный DoFnInfo в com.google.com cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply (MapTaskExecutorFactory.java:283) в com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply (MapTaskExecutorFactory.java:253) в com.google.cloud. graph.Networks $ TypeSafeNodeFunction.apply (Networks.java:55) в com.google.cloud.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply (Networks.java:43) в com.google.cloud.dataflow.worker. graph.Networks.replaceDirectedNetworkNodes (Networks.java:78) в com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create (MapTaskExecutorFactory.java:142) в com.google.cloud.dataflow.worker.DataflowWorker.doWork. (DataflowWorker.doWork. java: 271) в com.google.cloud.dataflow.worker.DataflowWorker.getA ndPerformWork (DataflowWorker.java:244) по адресу com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.doWork (DataflowBatchWorkerHarness.java:135) по адресу com.google.cloud.dataflow.worker.DataflowBatchWorkerThreadCarness $. java: 115) в com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.call (DataflowBatchWorkerHarness.java:102) в java.util.concurrent.FutureTask.run (FutureTask.java:266) в java.concur.concur .ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:745)
: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: невозможно десериализовать сериализованный DoFnInfo на com.google.cloud.dataflow.worker.repackaged.com .google.common.cache.LocalCache $ Segment.get (LocalCache .java: 2214) на com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get (LocalCache.java:4053) на com.google.cloud.dataflow.worker.repackaged.com .google.common.cache.LocalCache $ LocalManualCache.get (LocalCache.java:4899) в com.google.cloud.dataflow.worker.UserParDoFnFactory.create (UserParDoFnFactory.java:95) в com.google.cloud.dataflow.worker .DefaultParDoFnFactory.create (DefaultParDoFnFactory.java:66) в com.google.cloud.dataflow.worker. MapTaskExecutorFactory.createParDoOperation (MapTaskExecutorFactory.java:360) в com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply (MapTaskExecutorFactory.java:271) ... еще 14
не удалось для десериализации сериализованного DoFnInfo в org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray (SerializableUtils.java:75) в com.google.cloud.dataflow.worker.UserParDoFnFactory $ UserDoFnExtractor.com.google.clouddataflow.worker.UserParDoFnFactory .google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call (UserParDoFnFactory.java:100) в com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call (UserParDoFnFactory.java:97) в com.google.cloud.data. .worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache $ 1.load (LocalCache.java:4904) в com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LoadingValueReference .loadFuture (LocalCache.java:3628) на com.google.clou d.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.loadSync (LocalCache.java:2336) в com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.lockedGetOrLoad (LocalCache.java:2295) в com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.get (LocalCache.java:2208) ... еще 20 < br> Вызвано: java.lang.ClassNotFoundException: Header_H в java.net.URLClassLoader.findClass (URLClassLoader.java:381) в java.lang.ClassLoader.loadClass (ClassLoader.java:424) в sun.misc.assLauncher $ AppClass .loadClass (Launcher.java:331) в java.lang.ClassLoader.loadClass (ClassLoader.java:357) в java.lang.Class.forName0 (собственный метод) в java.lang.Class.forName (Class.java:348 ) в java.io.ObjectInputStream.resolveClass (ObjectInputStream.java:628) в java.io.ObjectInputStream.readNonProxyDesc (ObjectInputStream.java:1620) в java.io.ObjectInputStream.readClassDesc (ObjectInputStream. java: 1521) в java.io.ObjectInputStream.readClass (ObjectInputStream.java:1486) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java:1336) в java.io.ObjectInputStream.defaultReadFields (ObjectInputStream). java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1942) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1808) в java.io.ObjectInputStream.readObject0 (ObjectInputStream.java: Object: Object: ObjectInputStream.defaultReadFields (ObjectInputStream.java:2018) в java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1942) в java.io.ObjectInputStream.readOrdinaryObject (ObjectInputStream.java:1808) в java.io. .java: 1353) в java.io.ObjectInputStream.readObject (ObjectInputStream.java:373) в org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray (SerializableUtils.java:72) ... еще 28 - person bignano; 17.09.2017