Как запустить нисходящий метод onEventTime () при использовании шаблона BroadcastState?

Я использую конвейер следующим образом:

inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
     processBroadcastElement(...){...}
     processElement(...){...}
     }).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())

В CustomTrigger() я регистрирую eventTimeTimer(), который срабатывает, указывая на конец моего окна. Проблема в том, что метод onEventTime() никогда не вызывается, даже если:

  • Я заверил env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • Используя ascendingTimestampExtractor(), я отправил событие, которое определенно подтолкнуло водяной знак настолько далеко, что eventTimeTimer() должен сработать.

Что мне не хватает? Связано ли это с отсутствием водяных знаков и onTimer() методом KeyedBroadcastProcessFunction? Я подозреваю это из-за комментария Дэвида Андерсона в this ответ:

добавить специальные поддельные водяные знаки для нешироковещательного потока (установите значение Watermark.MAX_WATERMARK)

и тот факт, что я не реализовал метод с именем Timer. Однако, если это действительно так, я не понимаю, как это будет иметь отношение к нисходящему триггеру. Спасибо.

Изменить: полный пример этого сценария находится здесь.


person 343GuiltySpark    schedule 27.02.2020    source источник
comment
Чтобы уточнить, настраиваемый триггер пытается реализовать функциональность, описанную в примере Тилля [1], при использовании шаблона broadcastState. [1] .html /   -  person 343GuiltySpark    schedule 27.02.2020


Ответы (1)


Да, проблема в том, что у широковещательного потока нет водяных знаков. (Но нет, не имеет значения, есть ли у KeyedBroadcastProcessFunction метод onTimer или нет. Как только вы получите водяной знак, он все равно перейдет в окно.)

Всякий раз, когда оператор имеет два или более входа - так, в вашем случае, когда inputStream и configurationBroadcastStream соединены - водяной знак на этом операторе будет минимальным из водяных знаков с его входов. Поскольку в широковещательном потоке нет водяных знаков, это сдерживает водяные знаки, предоставленные inputStream.

У меня есть пример, показывающий, как вы можете с этим справиться. Предполагая, что вашему широковещательному потоку не требуется какая-либо информация о времени, вы можете реализовать экстрактор временных меток и назначитель водяных знаков, который эффективно передает управление водяными знаками другому потоку. Что-то вроде этого:

// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.

public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return Watermark.MAX_WATERMARK;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        return 0;
    }
}
person David Anderson    schedule 27.02.2020
comment
Это действительно была проблема, спасибо. Как вы думаете, стоит ли добавлять проверки на уровне API, чтобы гарантировать, что такого сценария не произойдет при создании конвейера операторов? То есть, если пользователь пытается соединить поток времени события с потоком вещания, Flink заставит пользователя реализовать getCurrentWatermark () для потока вещания. - person 343GuiltySpark; 28.02.2020
comment
Хорошая точка зрения; Мне нравится эта идея. Но я думаю, что к этому следует подходить в более общем плане. Везде, где конвейер использует таймеры времени событий, все источники, которые могут связаться с этим оператором, должны иметь водяные знаки. Возможно, над проверкой этого можно будет поработать после продолжающейся переработки водяных знаков, проводимой с помощью FLIP-27. - person David Anderson; 28.02.2020