Водяной знак PubSub не продвигается

Я написал задание Apache Beam с использованием Scio с целью создания идентификаторов сеансов для входящих записей данных, а затем каким-либо образом их обогащать, прежде чем выводить их в BigQuery. Вот код:

val measurements = sc.customInput("ReadFromPubsub",
  PubsubIO
    .readMessagesWithAttributes()
    .withTimestampAttribute("ts")
    .fromSubscription(subscription)
)

measurements
    .map(extractMeasurement).flatMap {
      case Success(event) =>
        Some(event)
      case Failure(ex) =>
        None
    }
    .timestampBy(_.timestamp)
    .withSessionWindows(sessionGap, WindowOptions(
      trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
      accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
      allowedLateness = Duration.standardDays(1),
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
    .keyBy(_.clientID)
    .groupByKey
    .toWindowed
    .map(assignSessionID)
    .toSCollection.flatMap(_.results)
    .map(enrich)
    .saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)

Я использую временную метку события, которая является значением ключа атрибута ts в PubsubMessage, в качестве моего атрибута временной метки. Это та же временная метка, что и та, которую я использую в .timestampBy, прежде чем обработать мои данные. Я ожидаю, что выходной триггер сработает, как только водяной знак пройдет мимо sessionGap (по умолчанию 30 минут).

При использовании бегуна Dataflow и DirectRunner триггер никогда не срабатывает, даже несмотря на то, что я имитирую данные с временными метками, разделенными более чем на 30 минут. В пользовательском интерфейсе потока данных я вижу, что водяной знак никогда не продвигается вперед на основе меток времени события, а только каждую вторую минуту, как если бы данные не были получены.

Я убедился, что данные действительно были получены, так как выполняется преобразование, предшествующее работе с окнами. Я также тестировал около 10 записей в секунду, но, возможно, этого недостаточно для обновления водяного знака? Я также настроил JobTest, в котором я получаю ожидаемый результат, также сигнализируя мне, что проблема основана на отметке времени/водяном знаке.

Я уверен, что пропустил что-то важное в документации или где-то допустил глупую ошибку и надеялся, что кто-то укажет мне правильное направление.


person simpaj    schedule 05.06.2018    source источник


Ответы (2)


Когда вы публикуете свои сообщения в pubsub, как вы генерируете метки времени, которые вы записываете в атрибут «ts» вашего сообщения, и как вы их кодируете?

Если я правильно помню, временные метки должны быть закодированы в соответствии со спецификацией RFC3339, например, что-то вроде этого "2020-10-02T10:00:00-05:00"

Вы также можете попробовать временно удалить строку ".withTimestampAttribute("ts")", чтобы автоматически генерировались временные метки. Затем проверьте, продвигается ли ваш водяной знак. Если это так, это указывает на проблему со значениями отметки времени (например, возможно, значения не соответствуют вашим ожиданиям) или их кодировкой.

Наконец, если вы используете средство запуска облачных потоков данных, взгляните на страницу статуса задания. Это должно показать вам текущее значение водяного знака данных. Вы можете проверить его, чтобы увидеть, соответствует ли он вашим ожиданиям.

person Slava Chernyak    schedule 21.05.2020

Вы можете попробовать добавить раннее и позднее срабатывание в AfterWatermark.pastEndofWindows. чтобы узнать, обновляется ли водяной знак, а также проверить наличие поздних данных. Также вы можете найти документацию по триггерам здесь.

person Amruth Bahadursha    schedule 29.06.2018