Короткий ответ. Да, вы можете считывать и обрабатывать несколько потоков и правил активации на основе ваших типов событий из разных источников потоков.
Длинный ответ. У меня было несколько похожее требование, и мой ответ основан на предположении, что вы читаете разные потоки из разных тем кафки.
Читайте из разных тем, которые транслируют разные события в одном источнике:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
Сериализатор считывает данные и анализирует их в общем формате - например, для.
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
и после этого все довольно просто, определите правила на основе имени события и сравните имя события для определения правил (вы также можете определить сложные правила следующим образом):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
Я надеюсь, что это натолкнет вас на идею объединить один или несколько различных потоков вместе.
person
Biplob Biswas
schedule
31.07.2017