Как собрать поздние данные в оконной обработке потока Flink

Представьте, что у меня есть поток данных, содержащий данные о времени события. Я хочу собрать поток входных данных за время окна 8 миллисекунд и уменьшить данные каждого окна. Я делаю это с помощью следующего кода:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()

Точка: ключ потока данных - это временная метка времени обработки, сопоставленная с последними 8 частями временной метки обработки миллисекунды, например 1531569851297 будет сопоставлена ​​с 1531569851296.

Но возможно, что поток данных прибыл с опозданием и пришел не в то время окна. Например, предположим, что я установил время окна на 8 миллисекунд. Если данные поступают в движок Flink по порядку или хотя бы с задержкой меньше, чем время окна (8 миллисекунд), это будет лучшим случаем. Но предположим, что время события потока данных (это также поле в потоке данных) прибыло с задержкой 30 миллисекунд. Таким образом, он войдет в неправильное окно, и я думаю, что если я проверю время события каждого потока данных, поскольку он хочет войти в окно, я могу отфильтровать такие поздние данные. Итак, у меня два вопроса:

  • Как я могу отфильтровать поток данных, если он хочет войти в окно, и проверить, созданы ли данные в нужную метку времени для окна?
  • Как я могу собрать такие поздние данные в переменной, чтобы обработать их?

person Soheil Pourbafrani    schedule 14.07.2018    source источник


Ответы (1)


Flink имеет две разные связанные абстракции, которые имеют дело с различными аспектами вычисления оконной аналитики для потоков с временными метками событий: водяные знаки и допустимая задержка.

Во-первых, водяные знаки, которые используются при работе с данными времени события (независимо от того, используете ли вы окна). Водяные знаки предоставляют Flink информацию о продвижении времени события и дают вам, разработчику приложения, средства борьбы с неупорядоченными данными. Водяные знаки текут вместе с потоком данных, и каждый из них отмечает позицию в потоке и содержит временную метку. Водяной знак служит подтверждением того, что в этой точке потока поток теперь (вероятно) завершен до этой отметки времени - или, другими словами, события, следующие за водяным знаком, вряд ли будут происходить раньше времени, указанного меткой времени. водяной знак. Наиболее распространенная стратегия водяных знаков - использовать BoundedOutOfOrdernessTimestampExtractor, который предполагает, что события прибывают с некоторой фиксированной ограниченной задержкой.

Теперь это дает определение задержки - события, которые следуют за водяным знаком с отметкой времени меньше, чем временная метка водяного знака, считаются поздними.

API окна предоставляет понятие допустимой задержки, которое по умолчанию установлено на ноль. Если допустимая задержка больше нуля, то триггер по умолчанию для окон времени событий будет принимать поздние события в соответствующие окна до предела допустимой задержки. Действие окна будет запускаться один раз в обычное время, а затем еще раз для каждого позднего события до конца разрешенного интервала задержки. После этого поздние события отбрасываются (или собираются на побочный выход, если он настроен).

How can I filter data stream as it wants to enter the window and check 
if the data created at the right timestamp for the window?

Назначители окон Flink несут ответственность за присвоение событий соответствующим окнам - все будет происходить автоматически. При необходимости будут созданы новые экземпляры окон.

How can I gather such late data in a variable to do some processing on them?

Вы можете либо быть достаточно щедрыми в водяных знаках, чтобы избежать каких-либо поздних данных, и / или настроить допустимую задержку, чтобы она была достаточно длинной, чтобы учесть поздние события. Однако имейте в виду, что Flink будет вынужден держать открытыми все окна, которые все еще принимают поздние события, что задержит сбор мусора старых окон и может потреблять значительный объем памяти.

Обратите внимание, что в этом обсуждении предполагается, что вы хотите работать с временными окнами - например, окна длиной 8 мсек, с которыми вы работаете. Flink также поддерживает окна подсчета (например, групповые события в пакеты по 100), окна сеанса и настраиваемую логику окна. Водяные знаки и опоздание не играют никакой роли, например, если вы используете окна подсчета.

Если вы хотите получить результаты для каждой клавиши для своей аналитики, используйте keyBy для разделения потока по ключу (например, по userId) перед применением окон. Например

stream
  .keyBy(e -> e.userId)
  .timeWindow(Time.seconds(10))
  .reduce(...)

будет выдавать отдельные результаты для каждого идентификатора пользователя.

Обновление: обратите внимание, что в последних версиях Flink теперь для окон можно собирать поздние события для побочного вывода.

Некоторая соответствующая документация:

Время события и водяные знаки
Допустимая задержка

person David Anderson    schedule 16.07.2018
comment
Спасибо за подробный ответ. Могу ли я использовать BoundedOutOfOrdernessTimestampExtractor и окно подсчета вместе? Вы знаете, что на самом деле я хочу иметь окно подсчета, потому что, согласно моему пониманию окон Flink в случае окна подсчета, его назначитель окна будет отправлять все входящие данные с одним и тем же ключом в одно и то же окно (я прав?) И время из этого окна id не заполнялось до 100 мс, оно должно сработать. Итак, я хочу знать, могу ли я использовать как окно BoundedOutOfOrdernessTimestampExtractor, так и окно подсчета, для которого с помощью водяного знака я установлю этот тайм-аут? - person Soheil Pourbafrani; 16.07.2018
comment
Если вы примените какое-либо окно к потоку с ключом, то поток будет разделен по ключу, и для каждого ключа будет выполняться отдельный анализ - будь то окно счетчика, временное окно, окно сеанса или пользовательское окно. Однако окна счета не имеют ничего общего со временем. Они игнорируют временные метки и водяные знаки и просто подсчитывают события, формируя пакеты из n событий. BoundedOutOfOrdernessTimestampExtractor ничего не сделает для окна подсчета, а окно подсчета никогда не истечет по таймауту. - person David Anderson; 16.07.2018
comment
Вы имеете в виду в любом окне, если у меня есть три сообщения с ключом 1, 2, 1: он отправит два сообщения с ключом 1 в одно и то же окно и сообщение с ключом 2 в другое окно? - person Soheil Pourbafrani; 16.07.2018
comment
@DavidAnderson Вы упомянули, что новые экземпляры окон будут создаваться по мере необходимости, знаете ли вы, создаются ли они параллельно? Если мой код работает с глобальным состоянием окна и зависит от порядка обработки окон, одновременное открытие нескольких окон будет проблематичным. - person Alexis; 19.06.2019
comment
Для любой данной клавиши Flink будет работать только с одним окном за раз. Но одновременно могут открываться несколько окон, причем не по порядку. Например, если у вас есть окна длиной в одну минуту, а события приходят с отметками времени 11:59:59, 12:01:01, а затем 12:00:50, окно для 12:00 будет создано после окна для 12:01 (при условии, что водяные знаки и / или допустимая задержка позволяют создать окно на 12:00). - person David Anderson; 19.06.2019