Обнаружение паттернов FlinkCEP не происходит в реальном времени

Я все еще новичок в библиотеке Flink CEP, и все же я не понимаю поведения обнаружения шаблонов. Рассматривая приведенный ниже пример, у меня есть приложение Flink, которое потребляет данные из темы kafka, данные создаются периодически, я хочу использовать шаблон Flink CEP, чтобы определять, когда значение превышает заданный порог. Код ниже:

public class CEPJob{
    
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                properties);

        consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        DataStream<String> stream = env.addSource(consumer);

        // Process incoming data.
        DataStream<Stock> inputEventStream = stream.map(new MapFunction<String, Stock>() {

            private static final long serialVersionUID = -491668877013085114L;

            @Override
            public Stock map(String value) {
                String[] data = value.split(":");

                System.out.println("Date: " + data[0] + ", Adj Close: " + data[1]);

                Stock stock = new Stock(data[0], Double.parseDouble(data[1]));

                return stock;
            }
        });

        // Create the pattern
        Pattern<Stock, ?> myPattern = Pattern.<Stock>begin("first").where(new SimpleCondition<Stock>() {
            private static final long serialVersionUID = -6301755149429716724L;

            @Override
            public boolean filter(Stock value) throws Exception {
                return (value.getAdj_Close() > 140.0);
            }

        });

        // Create a pattern stream from our warning pattern
        PatternStream<Stock> myPatternStream = CEP.pattern(inputEventStream, myPattern);

        // Generate alert for each matched pattern
        DataStream<Stock> warnings = myPatternStream .select((Map<String, List<Stock>> pattern) -> {
            Stock first = pattern.get("first").get(0);

            return first;
        });

        warnings.print();

        env.execute("CEP job");
    }
}

Что происходит, когда я запускаю задание, обнаружение шаблона не происходит в реальном времени, оно выводит предупреждение об обнаруженном шаблоне текущей записи только после создания второй записи, похоже, что печать в журнал отложена. warning, я действительно не понял, как заставить его выводить предупреждение, когда он обнаруживает шаблон, не дожидаясь следующей записи, и спасибо :).

Данные, поступающие из Kafka, имеют строковый формат: дата: значение, данные производятся каждые 5 секунд.

Версия Java: 1.8, версия Scala: 2.11.12, версия Flink: 1.12.2, версия Kafka: 2.3.0


person Yassin Rebai    schedule 20.04.2021    source источник


Ответы (1)


Решение, которое я обнаружил, чтобы отправлять поддельную запись (например, нулевой объект) в теме Kafka каждый раз, когда я создаю значение для темы, а на стороне Flink (в объявлении шаблона) я проверяю, является ли полученная запись поддельной или не. Похоже, что FlinkCEP всегда ожидает предстоящего события, прежде чем выдает предупреждение.

person Yassin Rebai    schedule 21.04.2021