Я использую конвейер следующим образом:
inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
processBroadcastElement(...){...}
processElement(...){...}
}).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())
В CustomTrigger()
я регистрирую eventTimeTimer()
, который срабатывает, указывая на конец моего окна. Проблема в том, что метод onEventTime()
никогда не вызывается, даже если:
- Я заверил
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- Используя
ascendingTimestampExtractor()
, я отправил событие, которое определенно подтолкнуло водяной знак настолько далеко, чтоeventTimeTimer()
должен сработать.
Что мне не хватает? Связано ли это с отсутствием водяных знаков и onTimer()
методом KeyedBroadcastProcessFunction
? Я подозреваю это из-за комментария Дэвида Андерсона в this ответ:
добавить специальные поддельные водяные знаки для нешироковещательного потока (установите значение Watermark.MAX_WATERMARK)
и тот факт, что я не реализовал метод с именем Timer. Однако, если это действительно так, я не понимаю, как это будет иметь отношение к нисходящему триггеру. Спасибо.
Изменить: полный пример этого сценария находится здесь.