Обнаружение шаблона Apache Flink CEP с помощью java [apache-flink]

Я хочу делать; чтобы начать с CEP с любым из элементов Arraylist, включенных в структуру карты, и продолжить с остальными элементами Arraylist, которые я начал. карта и структура рисунка:

final Map< Integer,ArrayList<String>> deger = new HashMap<Integer,ArrayList<String>>();
        deger.put(1,new ArrayList<String>(Arrays.asList("h:1","l:1","g:0")));
        deger.put(2,new ArrayList<String>(Arrays.asList("h:1","l:1","g:1")));
        deger.put(3,new ArrayList<String>(Arrays.asList("h:2","l:3","g:1")));
        deger.put(4,new ArrayList<String>(Arrays.asList("h:0","l:2","g:2")));

 for(int i=1;i<deger.size()+1;i++) {
            temp1.add(deger.get(i));
        }

Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                new SimpleCondition<String>() {
//                    @Override
                    public boolean filter(String value) throws Exception {

                        for (ArrayList<String> aa: temp1){
                            for (String dd : aa)
                                if(value.equals(dd)){ 
                                    return true;
                                }
                        }
                        return false;
                    }
                }
        ).followedBy("middle").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(1));
                    }
                }
        ).followedBy("end").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(2));
                    }
                }
        );

Моя цель - дать предупреждение с помощью элементов arrayylist на карте, но порядок элементов arraylist не важен из-за потока в нем. Я хочу продолжить с оставшимися элементами этого массива, где я могу вернуть информацию этого массива, когда Здесь я начинаю с любого массива. Например:

Incoming data = "l:1","h:1","g:0"
my pattern = "h:1","l:1","g:0" 
Start -> l:1 find
Middle -> g:0 or h:1 | h:1 find
End -> g:0 find -> alarm

person Taha ibişoglu    schedule 09.04.2019    source источник
comment
Эй, что именно здесь temp1?   -  person Dominik Wosiński    schedule 09.04.2019
comment
CEP в некоторой степени основан на временных рядах. Почему бы просто не использовать цикл для поиска элементов, если вам наплевать на время?   -  person Jiayi Liao    schedule 09.04.2019
comment
Я добавил temp1 прямо над функцией шаблона   -  person Taha ibişoglu    schedule 09.04.2019
comment
Я работаю в режиме реального времени, поэтому время важно   -  person Taha ibişoglu    schedule 09.04.2019
comment
Хорошо, просто чтобы прояснить. Вы хотите поднять тревогу, если есть какой-либо шаблон из списка в любом возможном порядке, но все элементы должны принадлежать к одному списку? Итак, вы хотите поднять тревогу, если "h:1","l:1","g:0" поступит в любом порядке, но если h: 1, l: 1, g: 2, то здесь не будет тревоги?   -  person Dominik Wosiński    schedule 09.04.2019
comment
не должно быть. для каждого списка должен генерироваться аварийный сигнал. так ч: 1, л: 1, г: 2 без будильника. Я хочу использовать ключ на карте, чтобы узнать, какой список сигналов тревоги генерируется.   -  person Taha ibişoglu    schedule 09.04.2019


Ответы (2)


 public static  Integer temp1;
    public static  Map<Integer,ArrayList<String>> temp2 = new HashMap<>();     
final Map< Integer,ArrayList<String>> deger = new HashMap<>();
            deger.put(1,new ArrayList<>(Arrays.asList("h:1","g:1","s:0")));
            deger.put(2,new ArrayList<>(Arrays.asList("h:1","g:1","g:0")));
            deger.put(3,new ArrayList<>(Arrays.asList("h:1","c:0","g:0")));
            deger.put(4,new ArrayList<>(Arrays.asList("h:1","s:1","g:0")));


            Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String value) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryStart : deger.entrySet()) {
                                if(entryStart.getValue().contains(value) && !temp2.containsKey(entryStart.getKey())){
                                        ArrayList<String> newList = new ArrayList<String>();
                                        newList.addAll(entryStart.getValue());
                                        newList.remove(value);
                                        temp2.put(entryStart.getKey(),newList);
                                        flag = true;
                                }
                            }
                            return flag;
                        }
                    }
            ).followedBy("middle").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String middle) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryMiddle : temp2.entrySet()) {
                                if(entryMiddle.getValue().contains(middle) && entryMiddle.getValue().size() == 2){
                                    ArrayList<String> newListMiddle = new ArrayList<String>();
                                    newListMiddle.addAll(entryMiddle.getValue());
                                    newListMiddle.remove(middle);
                                    temp2.put(entryMiddle.getKey(),newListMiddle);
                                    flag = true;
                                }
                            }
                            return flag;
                        }
                    }
            ).followedBy("end").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String end) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryEnd : temp2.entrySet()) {
                                if(entryEnd.getValue().contains(end) && entryEnd.getValue().size() == 1){
                                    flag = true;
                                    temp1 = entryEnd.getKey();
                                }
                            }
                            if (flag)
                                temp2.remove(temp1);
                            return flag;
                        }
                    }
            );

            PatternStream<String> patternStream = CEP.pattern(stream_itemset_ham,pattern);

            DataStream<String> result = patternStream.select(
                    new PatternSelectFunction<String, String>() {
                        @Override
                        public String select(Map<String, List<String>> map) throws Exception {
                            ArrayList<String> NewList= new ArrayList<>();
                            NewList.addAll(deger.get(temp1));
                            String found = "Found";
                            for (String list_element : NewList)
                                found += " " + list_element ;
                            return found;
                        }
                    }
            );
            result.print();

Из вашего вопроса я понимаю, что такое решение может быть предложено.

person samet yılmaz    schedule 10.04.2019
comment
Это решение будет в основном соответствовать каждой комбинации элементов всех шаблонов, независимо от того, действительно ли это шаблон или нет. Я имею в виду, что если у вас будет следующий входной поток "h:1", "d:1", "g:1" , "b:9" , "s:0", "z:0", "g:0", он все равно найдет два шаблона: h:1 g:1 g:0 и h:1 g:1 s:0, но во входных данных нет шаблонов. - person Dominik Wosiński; 10.04.2019
comment
Привет, Доминик! На самом деле ты прав и в этом. В моем коде важно найти патерн, но не обязательно найти элемент. Можете ли вы поделиться со мной, если у вас есть решение по-другому? - person samet yılmaz; 11.04.2019
comment
Привет, я добавил ответ :) - person Dominik Wosiński; 12.04.2019
comment
IterativeCondition не знал, что это такая вещь. Я выучил. Спасибо Доминик. - person samet yılmaz; 12.04.2019

поэтому в настоящее время AFAIK Flink не поддерживает неупорядоченные шаблоны из коробки, поэтому в основном я вижу два способа решения этой проблемы:

1) Вы можете создать все возможные шаблоны, которые хотите искать, и просто объединить все результирующие потоки данных.

2) Поскольку в этом сообщении предлагается FlinkCEP: Могу ли я ссылаться на более раннее событие, чтобы определить последующее совпадение? Вы можете попробовать использовать IterativeCondition, который позволит вам получить доступ к предыдущим элементам, которые уже были сопоставлены, поэтому в основном вам нужно будет определить шаблон, который соответствует всем возможным элементам из списки, а затем просто проверьте последнее условие, если все три из них принадлежат одному списку. Если да, то образец найден.

person Dominik Wosiński    schedule 10.04.2019