Siddhi CEP: агрегирование строковых значений сгруппированных событий в пакетном временном окне.

Я использую сиддхи, чтобы уменьшить количество событий, существующих в системе. Для этого я объявил временное окно пакетной обработки, в котором все события группируются на основе их target_ip.

from Events#window.timeBatch(30 sec)
select id as meta_ID, Target_IP4 as target_ip
group by Target_IP4
insert into temp;

В результате я хотел бы получить одно событие для каждого target_ip и значения параметра meta_ID в виде конкатенации отдельных событий, образующих событие.

Проблема в том, что предыдущий запрос генерирует столько событий, сколько различных значений meta_ID. например, я получаю

  1. "id_10", "target_1"
  2. «id_11», «target_1»

И я хотел бы иметь

  1. "id_10, id_11", "target_1"

Я знаю, что в моем запросе отсутствует какой-то метод агрегации, я видел много функций агрегации в Siddhi, включая расширение siddhi-execution-string, которое имеет метод str: concat, но я не знаю, как его использовать для агрегирования значений meta_ID. Любая идея?


person Peter Rubi    schedule 09.04.2018    source источник


Ответы (1)


Вы можете написать план выполнения, как показано ниже, для достижения ваших требований:

define stream inputStream (id string, target string);

-- Query 1
from inputStream#window.timeBatch(30 sec)
select *
insert into temp;

-- Query 2
from temp#custom:aggregator(id, target) 
select *
insert into reducedStream;

Здесь custom: aggregator - это настраиваемое расширение потокового процессора, которое вам нужно будет реализовать. Вы можете следовать [1] при его реализации.

Позвольте мне немного объяснить, как все работает:

Запрос 1 генерирует пакет событий каждые 30 секунд. Другими словами, мы используем Query 1 для создания пакета событий.

Таким образом, в конце каждого 30-секундного интервала пакет событий будет загружен в обработчик потока custom: aggregator. Когда входные данные поступают в потоковый процессор, срабатывает его метод process ().

@Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        //implement the aggregation & grouping logic here
}

Пакет событий находится в streamEventChunk. При реализации метода process () вы можете перебирать streamEventChunk и создавать по одному событию для каждого пункта назначения. Вам нужно будет реализовать эту логику в методе process ().

[1] https://docs.wso2.com/display/CEP420/Writing+a+Custom+Stream+Processor+Extension

person Dilini    schedule 18.04.2018