Ответить на запрос MINA после объединения нескольких запросов в Camel

Я пытаюсь реализовать минную службу, в которой ответ на последнее сообщение должен основываться на предыдущих сообщениях. Каждое сообщение (заголовок (1), данные (n), конец (1)) должно получить ответ, но ответ на сообщение «конец» должен быть основан на «заголовке», а также на любых полученных сообщениях «данные». как «конец» сообщения. В настоящее время я направляю сообщения в агрегатор, который завершает работу, когда находит «заголовок» и «конец» сообщения для определенного идентификатора корреляции. К сожалению, ответ отправляется до (или одновременно?) сообщения, отправленного агрегатору, поэтому у меня нет доступа к агрегированному сообщению (которое содержит все данные, необходимые для построения правильного ответа), когда построение ответа.

Есть ли способ сделать это без ручного хранения и доступа к накопленным данным (то есть без повторной реализации агрегатора верблюда)?

Редактировать:

Маршрут примерно такой:

<camelContext>
    <route>
        <from uri="mina:..."/>
        <process ref="messageProcessor"/>
        <aggregate>
            <process ref="completeMessageProcessor"/>
        </aggregate>
    </route>
</camelContext>

Я пропустил некоторые теги и атрибуты (correlationExpression, completionPredicate, strategyRef и т. д.) для ясности.

Сообщения правильно агрегировались и правильно обрабатывались при «завершении» (то есть при агрегировании). Но ответ, отправленный обратно через конечную точку mina клиенту, был сгенерирован messageProcessor, а не completeMessageProcessor.

Например (и да, это довольно надуманный пример, но потерпите меня), предположим, что протокол включает в себя отправку клиентом сообщения заголовка, которое включает общее количество сообщений данных, которые он ожидает отправить. Затем он отправляет ряд сообщений с данными, число которых может отличаться от ожидаемого. Наконец, он отправляет нижний колонтитул или завершающее сообщение. Затем сервер должен ответить разницей между ожидаемым количеством сообщений и фактическим количеством сообщений. При указанном маршруте это невозможно, так как количество сообщений неизвестно messageProcessor, который обрабатывает только отдельные сообщения. completeMessageProcessor, имея агрегированное сообщение (состоящее из заголовка, всех данных и конца), действительно знает этот номер, но ответ, созданный в этой точке, не распространяется обратно на конечную точку mina.

Изменение анализа сообщений для создания сообщения только при получении всего составного сообщения не является вариантом, поскольку сервер должен отвечать на отдельные сообщения.


person Tonio    schedule 06.08.2012    source источник
comment
опубликуйте свой маршрут ... не уверен, что вы подразумеваете под ответом, который отправляется до (или в то же время?), что сообщение отправляется агрегатору ...   -  person Ben ODay    schedule 09.08.2012


Ответы (1)


сверху, я предполагаю, что messageProcessor настраивает сообщение OUT, но completeMessageProcessor настраивает сообщение IN. Вместо этого ответ потребителя mina ожидает/использует сообщение OUT.

вы можете добавить логирование, чтобы убедиться в этом. если это так, то вы можете изменить свой messageProcessor, чтобы вместо этого использовать тело IN (или использовать заголовки обмена) и добавить преобразование после вашего completeMessageProcessor, чтобы установить тело OUT на основе тела IN.

<transform>
  <simple>${in.body}</simple>
</transform>

см. дополнительную информацию: http://camel.apache.org/using-getin-or-getout-methods-on-exchange.html

ОБНОВЛЕНИЕ: после некоторого обсуждения реальная проблема заключается в том, что агрегатор в настоящее время обрабатывает только обмены «InOnly».

person Ben ODay    schedule 11.08.2012
comment
Это было именно так. Спасибо! Я невольно зависел от того, что верблюд по умолчанию распространяет сообщение IN, если не установлено сообщение OUT, и был сбит с толку тем фактом, что обработка <bean...> устанавливает сообщение OUT (вместо того, чтобы вернуться к распространению сообщения IN). Таким образом, теги <bean...>, которые я использовал до агрегатора, правильно устанавливали сообщение OUT, поэтому я получал ответ там, а не после агрегатора. - person Tonio; 13.08.2012
comment
круто... рад, что смог помочь... я делал именно это много раз - person Ben ODay; 13.08.2012
comment
Хрм. Видимо ошибся, не работает. Ответ, отправляемый обратно, по-прежнему рассчитывается в messageProcessor, а не в completeMessageProcessor, даже несмотря на то, что я явно устанавливаю там тело сообщения OUT. На самом деле, если я не установлю сообщение OUT в messageProcessor, ответ, отправленный обратно, будет null. - person Tonio; 14.08.2012
comment
После дальнейшего изучения выясняется, что AggregationStrategy получает не исходный Exchange, а скорее копию. Я могу убедиться в этом, проверив идентификаторы бирж в разных точках. На протяжении всей цепочки, вплоть до непосредственно перед агрегатором, идентификатор биржи остается прежним. Обмен передан в AggregationStrategy, поскольку newExchange имеет другой идентификатор. Учитывая это, неудивительно, что ответ, отправляемый обратно на конечную точку мина, не является ответом, сгенерированным агрегатором, поскольку он находится на другой бирже. Похоже, мне не повезло. :( - person Tonio; 14.08.2012
comment
Я понимаю вашу точку зрения, агрегатор создает новый Exchange, предназначенный для нижестоящей конечной точки, а не в качестве ответа. Тем не менее, я считаю, что вы можете использовать шаблон обогащения контента вместе с отдельным маршрутом для вашего агрегатора, чтобы украсить первичный обмен его результатом... - person Ben ODay; 14.08.2012
comment
Как это сработает? wiretap в агрегатор, вывод из агрегатора в, скажем, direct конечную точку, и enrich обмен сразу после wiretap? Я попробую это и отчитаюсь. - person Tonio; 14.08.2012
comment
Мех, изменились приоритеты проекта, поэтому мне пришлось прекратить работу над этим вопросом. Однако до этого я не мог успешно использовать enrich или pollEnrich таким образом. По сути, я не мог понять, куда направить вывод агрегатора, чтобы получить его с помощью pollEnrich (пробовал конечные точки direct и seda, обе заблокированы при попытке обогащения). enrich не казался хорошим совпадением из-за его механики Producer (т. е. куда пойдет маршрут после <from uri="direct:enrich"/>?). - person Tonio; 14.08.2012
comment
после некоторого копания я вижу проблему, агрегатор в настоящее время обрабатывает только обмены InOnly ... не уверен, что на данный момент это можно обойти - person Ben ODay; 15.08.2012
comment
Ну, я думаю, это все. Я снова отмечу это как ответ. Спасибо, что нашли время, чтобы помочь с этим! - person Tonio; 15.08.2012