IllegalArgumentException - конвейер Apache BEAM - сдвиг отметки времени?

У меня есть существующий конвейер BEAM, который обрабатывает данные, полученные (из темы Google Pubsub) по 2 маршрутам. «Горячий» путь выполняет некоторые базовые преобразования и сохраняет их в Datastore, тогда как «холодный» путь выполняет фиксированное почасовое создание окон для более глубокого анализа перед сохранением.

Пока конвейер работает нормально, пока я не начал выполнять локальную буферизацию данных перед публикацией в Pubsub (поэтому данные поступают в Pubsub с опозданием на несколько часов). Возникающая ошибка выглядит следующим образом:

java.lang.IllegalArgumentException: Cannot output with timestamp 2018-06-19T14:00:56.862Z. Output timestamps must be no earlier than the timestamp of the current input (2018-06-19T14:01:01.862Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:463)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:429)
    at org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn.processElement(WithTimestamps.java:138)

Кажется, это ссылка на раздел моего кода (метод withTimestamps), который выполняет ежечасное оконное управление, как показано ниже:

Window<KV<String, Data>> window = Window.<KV<String, Data>>into
                (FixedWindows.of(Duration.standardHours(1)))
    .triggering(Repeatedly.forever(pastEndOfWindow()))
    .withAllowedLateness(Duration.standardSeconds(10))
    .discardingFiredPanes();

PCollection<KV<String, List<Data>>> keyToDataList = eData.apply("Add Event Timestamp", WithTimestamps.of(new EventTimestampFunction()))
    .apply("Windowing", window)
    .apply("Group by Key", GroupByKey.create())
    .apply("Sort by date", ParDo.of(new SortDataFn()));

Я не уверен, правильно ли понимаю, что сделал здесь неправильно. Ошибка возникает из-за того, что данные прибывают с опозданием? Насколько я понимаю, если данные прибывают с опозданием после допустимого опоздания, их следует отбросить и не выдавать ошибку, подобную той, которую я вижу.

Хотите знать, разрешит ли это установка неограниченного количества timestampSkew? Данные, которые поступают с опозданием, могут быть исключены из анализа, мне просто нужно убедиться, что не возникнут ошибки, которые задушат конвейер. Также нигде больше я не добавляю / изменяю временные метки для данных, поэтому я не уверен, почему возникают ошибки.


person jlyh    schedule 20.06.2018    source источник


Ответы (1)


Похоже, ваш DoFn использует « outputWithTimestamp », и вы пытаетесь установить временную метку, которая старше, чем временная метка входного элемента. Обычно временные метки выходных элементов выводятся из входных данных, это важно для обеспечения правильности вычисления водяного знака.

Вы можете обойти это, увеличив как сдвиг временной метки, так и допустимую задержку windwing, однако некоторые данные могут быть потеряны, вам решать, является ли такая потеря приемлемой в вашем сценарии.

Другой альтернативой является не использовать вывод с меткой времени, а вместо этого использовать метку времени сообщения PubSub для обработки каждого сообщения. Затем выведите каждый элемент как KV, где RealTimestamp вычисляется таким же образом, как вы в настоящее время обрабатываете временную метку (только не используйте ее в WithTimestamps), GroupByKey и запишите KV в Datastore.

Вы можете задать себе и другие вопросы:

  • Почему входные элементы связаны с самой последней меткой времени, чем выходные элементы?
  • Вам действительно нужно буферизовать такой объем данных перед публикацией в PubSub?
person ch_mike    schedule 24.10.2018