Я новичок в Apache Beam и пытаюсь запустить образец программы чтения и записи с помощью DirectRunner и DataflowRunner. В моем случае использования есть несколько аргументов CLI, и для этого я создал один интерфейс CustomOptions.java, расширяющий PipelineOptions.
При использовании DirectRunner программы работают нормально, но с DataflowRunner говорится, что «в интерфейсе CustomOptions отсутствует свойство с именем« проект »».
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<type>maven-plugin</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
CustomOptions.java (Интерфейс)
import org.apache.beam.sdk.options.PipelineOptions;
public interface CustomOptions extends PipelineOptions {
String getInput();
void setInput(String value);
String getOutput();
void setOutput(String value);
}
WordCount.java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class WordCount {
public static void main(String args[]) {
PipelineOptionsFactory.register(CustomOptions.class);
CustomOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Read", TextIO.read().from(options.getInput()))
.apply("Write", TextIO.write().to(options.getOutput()));
p.run();
}
}
Команды:
DirectRunner (Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath
DataflowRunner (Not Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath --runner=DataflowRunner --stagingLocation=gs://<tmp_path> --project=<projectId>
Ошибка:
Exception in thread "main" java.lang.IllegalArgumentException: Class interface CustomOptions missing a property named 'project'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1625)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:115)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:298)
at WordCount.main(WordCount.java:13)
Во-вторых, я попытался расширить CustomOptions с помощью DataflowPipelineOptions вместо PipelineOptions. Используя это также, я получаю сообщение об ошибке:
Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme gs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:215)
at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:734)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1069)
at WordCount.main(WordCount.java:15)
Во второй пробной версии возникает еще один вопрос: тот же код нельзя выполнить с помощью DirectRunner и DataflowRunner. Потому что во втором случае «projectId» - это обязательный аргумент, который не будет указываться в DirectRunner.