Разделение выходных таблиц BigQuery

Я прочитал как из документации, так и из этого ответа, что можно динамически определить назначение таблицы. Я использовал точно такой же подход, как показано ниже:

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Однако я получаю следующую ошибку компиляции:

The method to(String) in the type BigQueryIO.Write<Object> is not applicable for the arguments (new SerializableFunction<ValueInSingleWindow<Foo>,TableDestination>(){})

Любая помощь будет оценена по достоинству.

Изменить для разъяснения того, как я использую окна в моем случае:

PCollection<Foo> validFoos = ...;           
PCollection<TableRow> validRows = validFoos.apply(ParDo.named("Convert Foo to table row")
        .of(new ConvertToValidTableRowFn()))
        .setCoder(TableRowJsonCoder.of());
TableSchema validSchema = ConvertToValidTableRowFn.getSchema();    

validRows.apply(Window.<TableRow>into(CalendarWindows.days(1))).apply(BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
            @Override
            public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                TableRow t = value.getValue();
                String fooName = ""; // get name from table
                TableDestination td = new TableDestination(
                        "my-project:dataset.table$" + fooName, "");
                return td;
            }
        }));

В этом случае я получил следующую ошибку The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>).


person Ali    schedule 31.05.2017    source источник
comment
Какую версию SDK вы используете? Из другого поста: Эта функция будет включена в первый стабильный выпуск Apache Beam и в следующий выпуск Dataflow SDK (который будет основан на первом стабильном выпуске Apache Beam). Прямо сейчас вы можете использовать это, запустив свой конвейер против моментального снимка Beam at HEAD из github.   -  person Graham Polley    schedule 01.06.2017
comment
Я использую стабильную версию Apache Beam 2.0.0, выпущенную в мае. В его документации сказано, что эта функция включена. См. раздел о сегментировании в эта документация.   -  person Ali    schedule 01.06.2017
comment
Я только что наткнулся на эту свежую запись в блоге, посвященную этому . Он имеет некоторую разницу в синтаксисе (возвращает TableReferences вместо TableDestinations) и разделяет код на класс (делает его немного чище). Я не проверял это сам (и я использовал код, аналогичный вашему в прошлом), но я надеюсь, что это поможет.   -  person Matthias Baetens    schedule 01.06.2017
comment
Я очень быстро попробовал это сам с вашим кодом, и, похоже, он не дает мне никаких ошибок. Можете ли вы проверить свой POM и убедиться, что ваша версия BEAM 2.1.0-SNAPSHOT?   -  person Matthias Baetens    schedule 01.06.2017


Ответы (1)


Я считаю, что ошибка компиляции возникает из-за того, что вы выполняете эту операцию на PCollection<Foo>, хотя на самом деле она ожидает оконные значения. Таким образом, вы должны сначала использовать .apply(Window.<Foo>into(...)), а затем определить назначение таблицы на основе вашего окна.

Вы можете увидеть примеры в этот ответ или этот ответ, а также в документация, которую вы упомянули.

person Alexey    schedule 01.06.2017
comment
Спасибо за ответ, но когда я применяю это оконное управление, точно описанное в этот ответ, на этот раз я получаю The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>) - person Ali; 01.06.2017
comment
Вы должны изменить код на (Window.<Foo>into(..)), потому что в вашем случае у вас есть PCollection<Foo> и вы используете более общий метод write(), а код ответа использует методы PCollection<TableRow> и writeTableRows(). - person Alexey; 01.06.2017
comment
Да, но в данном случае я преобразовал PCollection<Foo> foos в PCollection<TableRow> fooRows до того, как использовал эту функцию. - person Ali; 01.06.2017
comment
Можно попробовать фрагмент из этот документ, то есть: PCollection<TableRow> quotes = ... quotes.apply(Window.<TableRow>into(CalendarWindows.days(1))) .apply(BigQueryIO.writeTableRows() .withSchema(schema) .to(new SerializableFunction<ValueInSingleWindow, String>() { public String apply(ValueInSingleWindow value) { ... } })); Какая часть кода не компилируется для вас? - person Alexey; 01.06.2017
comment
Я также сделал это, но я получаю ту же ошибку поверх первого apply. Он жалуется как The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>) - person Ali; 01.06.2017