Я прочитал как из документации, так и из этого ответа, что можно динамически определить назначение таблицы. Я использовал точно такой же подход, как показано ниже:
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
Однако я получаю следующую ошибку компиляции:
The method to(String) in the type BigQueryIO.Write<Object> is not applicable for the arguments (new SerializableFunction<ValueInSingleWindow<Foo>,TableDestination>(){})
Любая помощь будет оценена по достоинству.
Изменить для разъяснения того, как я использую окна в моем случае:
PCollection<Foo> validFoos = ...;
PCollection<TableRow> validRows = validFoos.apply(ParDo.named("Convert Foo to table row")
.of(new ConvertToValidTableRowFn()))
.setCoder(TableRowJsonCoder.of());
TableSchema validSchema = ConvertToValidTableRowFn.getSchema();
validRows.apply(Window.<TableRow>into(CalendarWindows.days(1))).apply(BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow t = value.getValue();
String fooName = ""; // get name from table
TableDestination td = new TableDestination(
"my-project:dataset.table$" + fooName, "");
return td;
}
}));
В этом случае я получил следующую ошибку The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>)
.
TableReference
s вместоTableDestination
s) и разделяет код на класс (делает его немного чище). Я не проверял это сам (и я использовал код, аналогичный вашему в прошлом), но я надеюсь, что это поможет. - person Matthias Baetens   schedule 01.06.2017