Почему CustomOptions в Apache Beam не наследует свойства DataflowPipelineOptions по умолчанию?

Я новичок в Apache Beam и пытаюсь запустить образец программы чтения и записи с помощью DirectRunner и DataflowRunner. В моем случае использования есть несколько аргументов CLI, и для этого я создал один интерфейс CustomOptions.java, расширяющий PipelineOptions.

При использовании DirectRunner программы работают нормально, но с DataflowRunner говорится, что «в интерфейсе CustomOptions отсутствует свойство с именем« проект »».

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.2.0</version>
        <type>maven-plugin</type>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

CustomOptions.java (Интерфейс)

import org.apache.beam.sdk.options.PipelineOptions;

public interface CustomOptions extends PipelineOptions {

    String getInput();
    void setInput(String value);

    String getOutput();
    void setOutput(String value);
}

WordCount.java

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class WordCount {

    public static void main(String args[]) {
        PipelineOptionsFactory.register(CustomOptions.class);
        CustomOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("Read", TextIO.read().from(options.getInput()))
                .apply("Write", TextIO.write().to(options.getOutput()));

        p.run();
    }
}

Команды:

DirectRunner (Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath
DataflowRunner (Not Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath --runner=DataflowRunner --stagingLocation=gs://<tmp_path> --project=<projectId>

Ошибка:

Exception in thread "main" java.lang.IllegalArgumentException: Class interface CustomOptions missing a property named 'project'.
    at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1625)
    at org.apache.beam.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:115)
    at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:298)
    at WordCount.main(WordCount.java:13)

Во-вторых, я попытался расширить CustomOptions с помощью DataflowPipelineOptions вместо PipelineOptions. Используя это также, я получаю сообщение об ошибке:

Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme gs
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
    at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:215)
    at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:734)
    at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1069)
    at WordCount.main(WordCount.java:15)

Во второй пробной версии возникает еще один вопрос: тот же код нельзя выполнить с помощью DirectRunner и DataflowRunner. Потому что во втором случае «projectId» - это обязательный аргумент, который не будет указываться в DirectRunner.


person Jitesh Sharma    schedule 26.11.2019    source источник
comment
В первом случае вы можете удалить --project = ‹projectId›   -  person Jayadeep Jayaraman    schedule 26.11.2019
comment
Чтобы прояснить, вы расширяли DataflowPipelineOptions и запускали его в DataflowRunner, когда увидели, что файловая система не найдена для ошибки схемы gs? Я бы не ожидал появления этой ошибки, если вы расширяете DataflowPipelineOptions. Не могли бы вы пояснить (1) какую из двух командных строк вы использовали и (2) какой класс параметров вы расширяли, когда увидели эту ошибку?   -  person Alex Amato    schedule 27.11.2019
comment
Я не уверен на 100%, можно ли использовать DataflowPipelineOptions с DirectRunner. Если требуется передать такие параметры, как --project в DirectRunner, это может сработать, если вы передадите неиспользуемое значение-заполнитель. Хотя я думаю, что параметр --project используется для источников и приемников, если они читают / записывают данные в службу GCP. В этом случае вам нужно будет указать допустимое значение. Если это не удастся, у вас могут быть две основные программы, которые меняют класс параметров, на DataflowRunner и DirectRunner.   -  person Alex Amato    schedule 27.11.2019
comment
@JayadeepJayaraman Если я удалю --project = ‹projectId›, это вызовет еще одно исключение для ключа --stagingLocation = ‹stagingLocation›. В нем говорится, что CustomOptions.java не имеет ключевого stagingLocation.   -  person Jitesh Sharma    schedule 28.11.2019
comment
@AlexAmato Да, вы правильно поняли, файловая система не найдена для схемы gs. Ошибка возникает, когда я расширяю DataflowPipelineOptions. И я использую обе команды, одну с DirectRunner, а другую с DataflowRunner.   -  person Jitesh Sharma    schedule 28.11.2019


Ответы (1)


После нескольких проб и ошибок, я думаю, что поступил правильно. Я использую те же классы java, которые упоминались в вопросе, т.е. расширяю CustomOptions.java с помощью PipelineOptions. Единственное изменение, которое я сделал, было в pom.xml.

Теперь я использую плагин maven shade с небольшой дополнительной конфигурацией вместо плагина сборки maven. С этим я добился: 1. Тот же самый jar можно использовать с DirectRunner или DataflowRunner. 2. Указание, какой основной класс я хочу выполнить из командной строки.

Предыдущий 'pom.xml':

<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.2.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- bind to the packaging phase -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <!-- add Main-Class to manifest file -->
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.dh.WordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.2.0</version>
        <type>maven-plugin</type>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

Новый 'pom.xml':

<build>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

<dependencies>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
        <version>2.16.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.16.0</version>
    </dependency>

</dependencies>

Это стало возможным, когда я прочитал этот ответ: Google Dataflow Файловая система не найдена для схема gs

person Jitesh Sharma    schedule 28.11.2019