Сохранение Apache Beam в BigQuery с использованием Scio и явным указанием TriggeringFrequency

Я использую Spotify Scio для создания конвейера потока данных scala, который запускается сообщением Pub/Sub. Он читает из нашего личного DB, а затем вставляет информацию в BigQuery.

Проблема в:

  • Мне нужно удалить предыдущие данные
  • Для этого мне нужно использовать режим записи WRITE_TRUNCATE
  • Но задание автоматически регистрируется как потоковое, поэтому я получаю следующую ошибку: WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
  • Поэтому мне нужно вручную изменить конвейер на Batch, указав частоту срабатывания.

Итак, до сих пор у меня был следующий конвейер:

sc
  .customInput("Job Trigger", inputIO)
  .map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
  .flatten
  .withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
  .groupBy(_.ssoId)
  .map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
  .filter(_.isSuccess)
  .map(_.get)
  .saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)

Я не могу найти способ указать частоту срабатывания при использовании scio api (saveAsBigQuery).

Он присутствует только в родном beam api:

BigQueryIO
  .write()
  .withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
  .to(bqTableName)
  .withSchema(getSchema)
  .withCreateDisposition(CREATE_NEVER)
  .withWriteDisposition(WRITE_TRUNCATE)

Если я использую BigQueryIO, мне придется использовать sc.pipeline.apply вместо моего текущего конвейера.

Есть ли способ как-то интегрировать BigQueryIO в мой текущий конвейер или как-то указать withTriggeringFrequency в текущем конвейере?


person MaxG    schedule 17.06.2019    source источник


Ответы (1)


Scio в настоящее время не поддерживает указание метода, который будет использоваться для загрузки данных в Big Query. Поскольку это невозможно, автоматически STREAMING_INSERTS используется для неограниченных коллекций, которые, очевидно, не могут поддерживать усечение. Следовательно, вам нужно вернуться к Beam BigQueryIO, указав частоту срабатывания (withTriggeringFrequency(...)) и метод (withMethod(Method.FILE_LOADS)).

Чтобы интегрировать его в свой конвейер Scio, вы можете просто использовать saveAsCustomOutput. Пример также можно найти здесь: https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library

person f.loris    schedule 17.06.2019
comment
но я не использую загрузку файлов в качестве триггера - person MaxG; 17.06.2019
comment
Частота срабатывания определяет, когда запускается загрузка данных в Big Query. Это может работать только при использовании FILE_LOADS. Ознакомьтесь с документами: beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/io/ Итак, если вы хочу использовать WRITE_TRUNCATE, это единственный шанс заставить его работать. - person f.loris; 17.06.2019