запуск через фиксированные интервалы при потоковой передаче луча apache

Я использую луч apache для написания потоковых конвейеров. Одним из требований для моего варианта использования является то, что я хочу запускать каждые X минут относительно времени начала или окончания окна. как я могу этого добиться. Текущий триггер AfterProcessingTime.pastFirstElementInPane () находится относительно времени обработки первого элемента в этом окне.

Например, я создал фиксированные 1-минутные окна, поэтому у меня есть window_1 (интервал 0-1 мин), window_2 (интервал 1-2 мин) и так далее. Теперь я хочу, чтобы результаты для каждого окна запускались ровно один раз через 10 минут с начала окна, то есть window_1 на 0 + 10 -> 10-й минуте, window_2 на 11-й минуте (1 + 10). [Примечание: я настраиваю фиксированные окна, чтобы позволить задержку> 10 минут, чтобы элементы не отбрасывались в случае задержки]

Есть ли способ добиться такого срабатывания для фиксированного окна.

Я не могу просто назначить все элементы глобальному окну, а затем выполнять повторный запуск каждую минуту, потому что тогда он теряет информацию о времени окна всех элементов. Например, если в моей коллекции pcollection есть 2 элемента, которые принадлежат window_1 и window_2 на основе метки времени события, но задержались на 3 и 3,2 минуты. Назначение их глобальному окну сгенерирует мне некоторый результат в конце 4-й минуты с учетом обоих элементов, тогда как на самом деле я хочу, чтобы они были назначены там фактическому фиксированному окну (как поздние данные).

Я хочу, чтобы элементы были назначены window_1 и window_2 на основе метки времени события, а затем window_1 запускался на 10-й минуте вывода результата путем обработки только 1 поздних данных для этого окна, а затем window_2 запускался на 11-й минуте с выводом после обработки единственного элемента, который пришла на 3,2 минуты с опозданием. Какой должна быть моя настройка триггера для достижения такого поведения в моем потоковом конвейере.


comment
Чтобы убедиться, что я правильно понимаю. Вы просите раздвижные окна. Т.е. окна которые перекрываются? то есть в секундах: [0, 10], [1, 11], [2,12] Рассмотрите возможность просмотра скользящих окон. cloud.google.com/dataflow/model/windowing#sliding-time- windows ИЛИ вы запрашиваете данные для 10-минутных окон. И хорошо ли обрабатываются запаздывающие данные, т. Е. Продолжать излучать каждый раз, когда вы получаете запоздалые данные?   -  person Alex Amato    schedule 11.05.2018
comment
Для allowed_lateness. Если вы просто хотите убедиться, что он в конечном итоге будет обработан, когда он поступит, вы можете установить allowedLatness на определенную продолжительность. Это поддерживает ваши элементы, чтобы их можно было повторно обработать в группе в случае поступления данных с опозданием. Но убедитесь, что вы установили файл accumulatingFiredPanes. cloud.google.com/dataflow/model/ cloud.google.com/dataflow/model /   -  person Alex Amato    schedule 11.05.2018
comment
@AlexAmato: я не имею в виду то, что вы сказали, ни скользящее окно, ни 10-минутное окно. Чтобы упростить задачу, забудьте о 10 минутах, скажем, я хочу создать фиксированные окна на 1 минуту, а затем сработать, когда окно закончится. Под концом окна я не имею в виду, когда он считается концом на основе прогресса водяного знака, но я имею в виду точное время обработки, когда оно заканчивается. Итак, скажем, начиная с эпохи, у меня есть 1-минутные окна, тогда текущее окно, скажем, N-е окно, я хочу, чтобы данные для этого окна были испущены / тигровали в начале в N + 1-м фиксированном окне (конец N-го окна).   -  person user179156    schedule 12.05.2018
comment
@AlexAmato: в этом есть смысл? 10 минут были обобщением этого. Я хочу, чтобы данные N-го окна запускались в N + x время (то есть через X минут после начала N-го окна размер окна может быть любым, от 1 секунды до 1 часа или более, не имеет значения, мне просто нужен триггер относительно время начала окна в качестве справки). Я говорю только о времени обработки, здесь нет времени события или предположения о том, когда окно заканчивается здесь. Просто сведите все в абсолютное время обработки.   -  person user179156    schedule 12.05.2018


Ответы (1)


Я считаю, что следующий код вам подходит:

pcollection | WindowInto(
    FixedWindows(1 * 60).configure().withAllowedLateness(),
    trigger=AfterProcessingTime(9 * 60),

Размер окна составляет 1 минуту, и через 9 минут оно запускает данные. Однако во многих случаях гораздо быстрее использовать скользящее окно, а затем позаботиться о дублированных обработанных элементах. Как упоминал AlexAmato, здесь также должны работать временные триггеры Watermarks и AfterWatermark Event.

person Shahin Vakilinia    schedule 04.07.2018
comment
Почему вы использовали 9 минут? Просто надеетесь, что ничего не придет позже? - person Davos; 17.01.2019