У меня есть существующий конвейер BEAM, который обрабатывает данные, полученные (из темы Google Pubsub) по 2 маршрутам. «Горячий» путь выполняет некоторые базовые преобразования и сохраняет их в Datastore, тогда как «холодный» путь выполняет фиксированное почасовое создание окон для более глубокого анализа перед сохранением.
Пока конвейер работает нормально, пока я не начал выполнять локальную буферизацию данных перед публикацией в Pubsub (поэтому данные поступают в Pubsub с опозданием на несколько часов). Возникающая ошибка выглядит следующим образом:
java.lang.IllegalArgumentException: Cannot output with timestamp 2018-06-19T14:00:56.862Z. Output timestamps must be no earlier than the timestamp of the current input (2018-06-19T14:01:01.862Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:463)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:429)
at org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn.processElement(WithTimestamps.java:138)
Кажется, это ссылка на раздел моего кода (метод withTimestamps), который выполняет ежечасное оконное управление, как показано ниже:
Window<KV<String, Data>> window = Window.<KV<String, Data>>into
(FixedWindows.of(Duration.standardHours(1)))
.triggering(Repeatedly.forever(pastEndOfWindow()))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes();
PCollection<KV<String, List<Data>>> keyToDataList = eData.apply("Add Event Timestamp", WithTimestamps.of(new EventTimestampFunction()))
.apply("Windowing", window)
.apply("Group by Key", GroupByKey.create())
.apply("Sort by date", ParDo.of(new SortDataFn()));
Я не уверен, правильно ли понимаю, что сделал здесь неправильно. Ошибка возникает из-за того, что данные прибывают с опозданием? Насколько я понимаю, если данные прибывают с опозданием после допустимого опоздания, их следует отбросить и не выдавать ошибку, подобную той, которую я вижу.
Хотите знать, разрешит ли это установка неограниченного количества timestampSkew? Данные, которые поступают с опозданием, могут быть исключены из анализа, мне просто нужно убедиться, что не возникнут ошибки, которые задушат конвейер. Также нигде больше я не добавляю / изменяю временные метки для данных, поэтому я не уверен, почему возникают ошибки.