Последовательное выполнение операций чтения - Apache Beam

Мне нужно выполнить следующие операции в указанной последовательности: -

 PCollection<String> read = p.apply("Read Lines",TextIO.read().from(options.getInputFile())) 

      .apply("Get fileName",ParDo.of(new DoFn<String,String>(){
          ValueProvider<String> fileReceived = options.getfilename();
          @ProcessElement
          public void procesElement(ProcessContext c)
          {
              fileName = fileReceived.get().toString();
              LOG.info("File: "+fileName);
          }
      }));

      PCollection<TableRow> rows = p.apply("Read from BigQuery",
              BigQueryIO.read()
                  .fromQuery("SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'")
              .usingStandardSql());

Как это сделать в Apache Beam / Dataflow?


person rish0097    schedule 17.07.2017    source источник
comment
Не могли бы вы рассказать больше о своем сценарии использования? Похоже, что это простые операции чтения без каких-либо побочных эффектов, поэтому я не понимаю, почему это имеет значение, выполняются ли они последовательно или параллельно, или как внешний наблюдатель сможет определить, что из этого имеет место.   -  person jkff    schedule 18.07.2017
comment
Хорошо ... как вы могли заметить, я использую значение переменной в fileName, полученное в операции Get fileName, в следующем запросе для чтения из таблицы BigQuery. Но что происходит, так это то, что операция чтения из BigQuery выполняется до получения fileName и, следовательно, получает нулевое значение. Поэтому обязательно, чтобы операции выполнялись последовательно. Я предполагаю, что это происходит потому, что я снова использую p.apply при чтении из BigQuery ... как справиться с этой ситуацией?   -  person rish0097    schedule 18.07.2017
comment
Ох, понятно, я пропустил эту часть. Прежде чем я коснусь этого - меня смущает что-то еще в вашем коде. Ваш DoFn всегда выводит одно и то же значение, исходящее из ваших PipelineOptions, и игнорирует содержимое своей входной коллекции PCollection (т.е. результат вашего TextIO.read () фактически отбрасывается). Это намеренно?   -  person jkff    schedule 18.07.2017
comment
Да ... я имею в виду, что я сделал это только для доступа к значению filaName ... результат TextIO.read () используется позже в программе ... поэтому он не выбрасывается как таковой ...   -  person rish0097    schedule 19.07.2017
comment
Как написано в фрагменте кода, он выброшен - я полагаю, ваша фактическая программа тогда будет другой. Кроме того, в этом фрагменте вы выводите значение options.getfilename () не один раз, а N его копий, где N - количество строк во всех файлах, соответствующих шаблону getInputFile (), т.е. чтение PCollection содержит такое количество идентичных копий. из options.getfilename (). Думаю, я могу подсказать, как делать то, что вы на самом деле пытаетесь сделать; отправлю ответ.   -  person jkff    schedule 19.07.2017


Ответы (1)


Похоже, вы хотите применить BigQueryIO.read().fromQuery() к запросу, который зависит от значения, доступного через свойство типа ValueProvider<String> в вашем PipelineOptions, а поставщик недоступен во время построения конвейера, то есть вы вызываете свое задание через шаблон.

В этом случае правильным решением будет использовать NestedValueProvider:

PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(
    NestedValueProvider.of(
      options.getfilename(),
      new SerializableFunction<String, String>() {
        @Override
        public String apply(String filename) {
          return "SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'";
        }
      })));
person jkff    schedule 19.07.2017
comment
У меня просто есть еще один вопрос, связанный с этим, если вы не возражаете ... есть ли способ напрямую вставить данные из файла, прочитанного в строках чтения, в BigQuery, используя схему и таблицу, полученные из вышеуказанной функции. Я попытался сослаться на DynamicDestinations в Apache Beam и сообщение, которое вы предложили - stackoverflow.com/a/43505535/278042, но не мог понять, как это сделать. Спасибо. - person rish0097; 19.07.2017
comment
Не могли бы вы опубликовать для этого отдельный вопрос, включая более подробную информацию о том, что именно вы пробовали, а что не работает? Таким образом, больше людей смогут увидеть это и извлечь уроки из ответа. - person jkff; 19.07.2017
comment
Конечно @jkff ... !! - person rish0097; 19.07.2017