Как обновить поток ответом из другого потока, где тип приемника - http-ответ

Я пытаюсь обогатить свой входной поток дополнительным атрибутом, который заполняется через приемник ответа "http-response".

Я пробовал использовать "join" с атрибутом window и с ключевым словом "every", чтобы объединить два потока и вставить полученный объединенный поток в другой поток, чтобы обогатить его.

Атрибуты окна (window.time (1 секунда) или window.length (1)) и ключевое слово «каждое» хорошо работают, когда входящие события приходят с регулярным интервалом в 1 секунду или более.

Когда (скажем, 10 или 100) событий отправляются одновременно (в течение секунды). Тогда результат слияния не ожидаемый.

Тот, у кого есть атрибут "окно" (присоединиться)

**

from EventInputStreamOne#window.time(1 sec) as i
        join EventInputStreamTwo as s
        on i.variable2 == s.variable2
select i.variable1 as variable1, i.variable2 as variable2, s.variable2 as variable2
insert into EventOutputStream;

**

Тот, у которого есть ключевое слово "каждый"

**

from every e1=EventInputStream,e2=EventResponseStream
select e1.variable1 as variable1, e1.variable2 as variable2, e2.variable3 as variable3
insert into EventOutputStream;

**

Есть ли лучший способ объединить два потока, чтобы обновить третий поток?


person Vigneshwaran    schedule 01.10.2019    source источник
comment
Чтобы было ясно, EventInputStreamOne - это запрос, который вы отправляете через источник http-запроса, а EventResponseStream - это ответ на него через http-response.   -  person Niveathika    schedule 01.10.2019
comment
да ... правильно ...   -  person Vigneshwaran    schedule 01.10.2019


Ответы (2)


Чтобы получить исходные атрибуты запроса, вы можете использовать настраиваемое сопоставление следующим образом:

@source(type='http-call-response', sink.id='source-1'
       @map(type='json',@attributes(name='name', id='id', volume='trp:volume', price='trp:price')))
define stream responseStream(name String, id int, headers String, volume long, price float);

Здесь к атрибутам запроса можно получить доступ с помощью trp:attributeName, в этом примере только имя взято из ответа, цена и объем взяты из запроса.

person Niveathika    schedule 02.10.2019
comment
Я надеюсь, что все, что вы предложили, предназначено для объединения потока ввода и ответа и для вставки полученного потока в третий поток ... Я пробовал то, что вы предложили, но получаю Нет расширения для источника: http-call-response error ... Я тоже пробовал с http-ответом, но результат не соответствует ожидаемым ... - person Vigneshwaran; 03.10.2019
comment
http-call-response - это последний приемник для устаревшего приемника HTTP-ответа. Какую версию сиддхи вы примеряете? и версия расширения io-http. - person Niveathika; 03.10.2019
comment
Я использую версию siddhi 1.2.0, которая встроена в wso2 версии 4.4.0. Есть ли у меня файл jar обновления? - person Vigneshwaran; 03.10.2019
comment
Кажется, выпущена последняя версия 2.0.5, https://mvnrepository.com/artifact/org.wso2.extension.siddhi.io.http/siddhi-io-http. Давайте попробуем с этим, если нет, пожалуйста, поделитесь приложением сиддхи - person Niveathika; 03.10.2019
comment
Я пробовал использовать версию 2.0.5 и даже самую последнюю версию 2.1.2, но не повезло ... Я получаю ту же ошибку ... Пожалуйста, найдите мое приложение siddhi ниже ... Из-за ограничения количества символов я делюсь каждым из мои стримы отдельным комментарием ... - person Vigneshwaran; 08.10.2019
comment
@info (name = 'Input') из InputStream выберите str: replaceAll (text, '', '% 20') как x, str: concat (str: lower (beta), '_1000001') как имя, str: concat (str: lower (beta), '_1000001_id') как id, str: lower (beta) как beta, str: lower (gamma) как gamma вставить в HitAlphaStream; - person Vigneshwaran; 08.10.2019
comment
@sink (type = 'http-call' ,ink.id = AlphaStream, publisher.url = '10.236 .220.136: 5071 / parse? X = {{x}} & name = {{name}} & id = {{id}} ', method =' GET ', headers =' Content-Type: text / plain ', @map (type =' keyvalue ', @payload (x =' {{x}} ', name =' {{name}} ', id =' {{id}} '))) определяют поток HitAlphaStream ( x строка, строка имени, строка идентификатора, строка бета, строка гаммы); - person Vigneshwaran; 08.10.2019
comment
@source (type = 'http-call-response' ,ink.id = 'AlphaStream', @map (type = 'json', @ attributes (event = 'intent', text = 'text', beta = 'trp: beta ', gamma =' trp: gamma '))) @sink (type =' log ') определить поток AlphaResponseStream (строка события, текстовая строка, бета-строка, гамма-строка); - person Vigneshwaran; 08.10.2019
comment
И я попытался обновить версию расширения io-http, заменив существующий файл jar (siddhi-io-http-1.2.0.jar) внутри рабочего контейнера sp новым файлом jar (siddhi-io-http-2.1.2. jar) ... Но я не думаю, что обновление версии происходит таким образом ... Не могли бы вы предложить мне способ добиться этого ... - person Vigneshwaran; 08.10.2019
comment
Прошу прощения за недоразумение, но похоже, что WSO2 SP 4.4.0 поддерживает только Siddhi 4, а версии siddhi-io-http2x несовместимы. Вы должны использовать расширения версии http 1x. Ваш синтаксис правильный, вам просто нужно вернуть типы приемника и источника на http-запрос и http-ответ, и он будет работать. - person Niveathika; 10.10.2019
comment
Мы уже использовали эти типы приемника (http-ответ) и источника (http-запрос). Наша проблема не в потоках запроса и ответа, а в слиянии потока ответа и потока ввода (поток ввода отличается от потока запроса stream, поскольку запрос будет иметь только несколько атрибутов входного потока, которые необходимы для получения ответа). Мы попытались вставить результат этого слияния в выходной поток. Слияние работает нормально, когда мы отправляем 1 событие в секунду, и оно не выполняется. когда мы отправляем 5 или 10 событий / сек. Когда мы отправляем 10 в секунду, только 3 или 4 потока ответа и входные потоки объединяются. Пожалуйста, помогите. - person Vigneshwaran; 10.10.2019
comment
Как обсуждалось в автономном режиме, свойства trp работают для http-запроса и http-ответа. Пожалуйста, отметьте ответ как правильный после того, как попробуете его, - person Niveathika; 10.10.2019

Синтаксис в вашем подходе "все" ключевые слова не совсем правильный. Вы пробовали что-то подобное:

from every (e1 = event1) -> e2=event2[e1.variable == e2.variable]
select e1.variable1, e2.variable1, e2.variable2
insert into outputEvent;

Этот документ может помочь.

person achaudry    schedule 11.10.2019
comment
Хотя вышесказанное верно. В случае использования приемника HTTP-запроса и источника HTTP-ответа мы можем использовать trp: properties, что является эффективным - person Niveathika; 14.10.2019