Хо, могу ли я провести ленивый матч с Flink CEP

Я хочу использовать FlinkCEP только для «ленивого» поиска по шаблону. Как я могу это сделать? например У меня есть входной поток ACABCABCB, и я хочу сопоставить A с FollowBy C, чтобы получить только 3 совпадения, а не 6 совпадений.

Я создал следующий пример, чтобы проиллюстрировать свою проблему.

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

case class MyEvent(id: Int, kind: String, value: String)
case class MyAggregatedEvent(id: Int, concatenatedValue: String)

val eventStream = env.fromElements(
  MyEvent(1, "A", "1"), MyEvent(1, "C", "1"),
  MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"),
  MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"),
  MyEvent(1, "B", "3")
)

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .next("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern)

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect {
  (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) =>
    val partA = pattern.get("pA").get
    val partC = pattern.get("pC").get

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value))
}
outNextStream.print()

env.execute("Experiment")

Это дает мне следующий результат:

MyAggregatedEvent (1,1 => 1)

Когда я меняю узор на:

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .followedBy("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

Затем печатается следующее:

MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,1 => 2)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,1 => 3)
MyAggregatedEvent ( 1,2 => 3)
MyAggregatedEvent (1,3 => 3)

Как я могу создать шаблон, который соответствует каждому событию только один раз, чтобы мой результат был:

MyAggregatedEvent (1,1 => 1)
MyAggregatedEvent (1,2 => 2)
MyAggregatedEvent (1,3 => 3)


person FredT    schedule 06.07.2016    source источник


Ответы (1)


На данный момент это не поддерживается библиотекой Flink CEP. Семантику сопоставления пока нельзя контролировать. Думаю, было бы хорошо для начала добавить MATCH_ALL и режим совпадения MATCH_FIRST. MATCH_FIRST отбрасывает все промежуточные состояния, как только он видит полностью совпадающую последовательность. Это должно охватывать ваш вариант использования.

person Till Rohrmann    schedule 07.07.2016