Apache Flink CEP, как пройти во временном окне на основе значения события?

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

Есть ли способ заменить Time.seconds(10) на value.getSomeTimeField(), который я передаю через Event?


person atkayla    schedule 15.05.2018    source источник


Ответы (1)


Я думаю, вы хотите работать в режиме времени события. Подробнее об этом вы можете узнать в этих документах. и это раздел о том, как извлечь метку времени из элемента.

В вашем примере вы можете сделать что-то вроде:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {

    @Override
    public long extractAscendingTimestamp(MyEvent element) {
        return value.getSomeTimeField();
    }
})

CEP.pattern(input, pattern).select(...)

Таким образом, события будут автоматически сортироваться в потоке, а тайм-аут будет применяться в обоих случаях в отношении поля времени.

person Dawid Wysakowicz    schedule 16.05.2018
comment
Так что даже без .within(...) это вызовет тайм-ауты? Да, идея в том, что я не хочу жестко кодировать окно, например. не Time.seconds(5), но он будет основан на этом наборе событий, например. отправка определенного вида посылки. - person atkayla; 16.05.2018
comment
Ах, это немного меняет ситуацию, нет, к сожалению, значение в предложении within должно быть стабильным значением. Он не может основываться на свойствах событий. - person Dawid Wysakowicz; 16.05.2018