Я использую Spotify Scio
для создания конвейера потока данных scala, который запускается сообщением Pub/Sub
. Он читает из нашего личного DB
, а затем вставляет информацию в BigQuery
.
Проблема в:
- Мне нужно удалить предыдущие данные
- Для этого мне нужно использовать режим записи
WRITE_TRUNCATE
- Но задание автоматически регистрируется как потоковое, поэтому я получаю следующую ошибку:
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
- Поэтому мне нужно вручную изменить конвейер на
Batch
, указав частоту срабатывания.
Итак, до сих пор у меня был следующий конвейер:
sc
.customInput("Job Trigger", inputIO)
.map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
.flatten
.withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
.groupBy(_.ssoId)
.map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
.filter(_.isSuccess)
.map(_.get)
.saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)
Я не могу найти способ указать частоту срабатывания при использовании scio
api (saveAsBigQuery
).
Он присутствует только в родном beam
api:
BigQueryIO
.write()
.withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
.to(bqTableName)
.withSchema(getSchema)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_TRUNCATE)
Если я использую BigQueryIO
, мне придется использовать sc.pipeline.apply
вместо моего текущего конвейера.
Есть ли способ как-то интегрировать BigQueryIO
в мой текущий конвейер или как-то указать withTriggeringFrequency
в текущем конвейере?