Можно ли обрабатывать несколько потоков в apache flink CEP?

Мой вопрос заключается в том, что если у нас есть два необработанных потока событий, т.е. Дым и Температура, и мы хотим выяснить, произошло ли сложное событие, т.е. Пожар применяя операторы к необработанным потокам, можем ли мы сделать это во Flink?

Я задаю этот вопрос, потому что все примеры, которые я видел до сих пор для Flink CEP, включают только один входной поток. Пожалуйста, поправьте меня, если я ошибаюсь.


person Amarjit Dhillon    schedule 28.07.2017    source источник


Ответы (2)


Короткий ответ. Да, вы можете считывать и обрабатывать несколько потоков и правил активации на основе ваших типов событий из разных источников потоков.

Длинный ответ. У меня было несколько похожее требование, и мой ответ основан на предположении, что вы читаете разные потоки из разных тем кафки.

Читайте из разных тем, которые транслируют разные события в одном источнике:

FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
        new StringSerializerToEvent(),
        props);

kafkaSource.assignTimestampsAndWatermarks(new 
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
        .filter(Objects::nonNull);

Сериализатор считывает данные и анализирует их в общем формате - например, для.

@Data
public class BAMEvent {
 private String keyid;  //If key based partitioning is needed
 private String eventName; // For different types of events
 private String eventId;  // Any other field you need
 private long timestamp; // For event time based processing 

 public String toString(){
   return eventName + " " + timestamp + " " + eventId + " " + correlationID;
 }

}

и после этого все довольно просто, определите правила на основе имени события и сравните имя события для определения правил (вы также можете определить сложные правила следующим образом):

Pattern.<BAMEvent>begin("first")
        .where(new SimpleCondition<BAMEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(BAMEvent event) throws Exception {
            return event.getEventName().equals("event1");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<BAMEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

            if (!secondEvent.getEventName().equals("event2")) {
              return false;
            }

            for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getEventId = firstEvent.getEventId()) {
                return true;
              }
            }
            return false;
          }
        })
        .within(withinTimeRule);

Я надеюсь, что это натолкнет вас на идею объединить один или несколько различных потоков вместе.

person Biplob Biswas    schedule 31.07.2017

Интересно, можно ли выполнить строгую цепочку (вместо followBy, если можно использовать next), потому что в данном потоке может быть много событий для определенной метки времени. Итак, скажем, для времени t1-: a,b,c - эти три события приходят, а для времени t2-: a2,b2,c2 приходит к движку flink. Итак, мне интересно, как мы получаем событие (a).next (a2), потому что это может никогда не иметь место, поскольку ряд будет выглядеть примерно так: a b c a2 b2 c2

однако, если модуль CEP обрабатывает события таким образом, что он считает одну метку времени одним событием, то это имеет смысл.

person Anish Sarangi    schedule 20.04.2020