Я написал задание Apache Beam с использованием Scio с целью создания идентификаторов сеансов для входящих записей данных, а затем каким-либо образом их обогащать, прежде чем выводить их в BigQuery. Вот код:
val measurements = sc.customInput("ReadFromPubsub",
PubsubIO
.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromSubscription(subscription)
)
measurements
.map(extractMeasurement).flatMap {
case Success(event) =>
Some(event)
case Failure(ex) =>
None
}
.timestampBy(_.timestamp)
.withSessionWindows(sessionGap, WindowOptions(
trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = Duration.standardDays(1),
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.keyBy(_.clientID)
.groupByKey
.toWindowed
.map(assignSessionID)
.toSCollection.flatMap(_.results)
.map(enrich)
.saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)
Я использую временную метку события, которая является значением ключа атрибута ts
в PubsubMessage
, в качестве моего атрибута временной метки. Это та же временная метка, что и та, которую я использую в .timestampBy
, прежде чем обработать мои данные. Я ожидаю, что выходной триггер сработает, как только водяной знак пройдет мимо sessionGap (по умолчанию 30 минут).
При использовании бегуна Dataflow и DirectRunner триггер никогда не срабатывает, даже несмотря на то, что я имитирую данные с временными метками, разделенными более чем на 30 минут. В пользовательском интерфейсе потока данных я вижу, что водяной знак никогда не продвигается вперед на основе меток времени события, а только каждую вторую минуту, как если бы данные не были получены.
Я убедился, что данные действительно были получены, так как выполняется преобразование, предшествующее работе с окнами. Я также тестировал около 10 записей в секунду, но, возможно, этого недостаточно для обновления водяного знака? Я также настроил JobTest, в котором я получаю ожидаемый результат, также сигнализируя мне, что проблема основана на отметке времени/водяном знаке.
Я уверен, что пропустил что-то важное в документации или где-то допустил глупую ошибку и надеялся, что кто-то укажет мне правильное направление.