Как использовать все сообщения, необходимые в Spring IntegrationFlow, когда количество сообщений превышает количество одновременных потребителей?

У меня есть поток интеграции, определенный следующим образом:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> a.correlationExpression("payload.entityId")
                                    .releaseExpression("size() eq iterator().next().payload.batchSize")
                                    .sendPartialResultOnExpiry(true)
                                    .groupTimeout(2000)
                                    .expireGroupsUponCompletion(true)
                                    .outputProcessor(myMessageGroupProcessor))
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

Чья цель состоит в том, чтобы сгруппировать несколько связанных сообщений, которые отправляются в «пакете». Вот пример:

// Message 1
{ "name": "Message1", 
  "entityId": "someId"
  "batchSize": 2,
  "batchIndex": 1, 
  .... }

// Message 2
{ "name": "Message2",
  "entityId": "someId"
  "batchSize": 2,
  "batchIndex": 2, 
  .... }

По причинам, описанным здесь, мы используя ручное подтверждение для RabbitMQ, чтобы избежать потери сообщений.

Поток интеграции отлично работает для пакетов размером 2, но как только в пакете больше 2 сообщений, мы сталкиваемся с проблемами:

[my-service] 2017-12-04 17:46:07.966  INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2 
[my-service] 2017-12-04 17:46:09.976  INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3

Обратите внимание, что время между зарегистрированными сообщениями составляет примерно 2 секунды (то есть то, что мы настроили как groupTimeout).

Я подозреваю, что причина этого в том, что Spring потребляет 2 сообщения (которые не подтверждаются автоматически), затем агрегация ожидает 3-го сообщения (поскольку batchSize в этом случае равно 3). Но это сообщение никогда не будет использовано в течение 2-секундного окна, поскольку существует только два одновременных потребителя.

Увеличение числа concurrentConsumers до 3 решает эту проблему. Проблема в том, что мы не знаем размер пакетов, которые мы получаем, и они могут быть довольно большими, например, размером 50 или около того. Это означает, что простое увеличение concurrentConsumers не является приемлемым вариантом.

Каков правильный способ справиться с этим весной?


person Johan    schedule 05.12.2017    source источник


Ответы (1)


Как я уже говорил в комментариях к этому ответ...

При использовании этого шаблона concurrency * prefetch должен быть достаточно большим, чтобы содержать сообщения для всех ожидающих пакетов.

По этой причине я не поддерживаю использование шаблона, если только у вас нет достаточно предсказуемых данных.

person Gary Russell    schedule 05.12.2017
comment
Спасибо, но вы предполагаете, что есть другой шаблон, который решает проблему агрегации + безопасности (т.е. гарантированно не потеряет сообщения при сбое)? Если это так, я был бы очень признателен за любые указатели на это. - person Johan; 06.12.2017
comment
Если вы можете быть уверены, что concurrency * prefetch достаточно велик для всех ситуаций, все будет в порядке. Если вы не можете этого предсказать, вы попадете в тупик. Вы можете справиться с этим, используя групповой тайм-аут и отклоняя отброшенные сообщения, чтобы они были доставлены повторно. Альтернативой является использование постоянного хранилища сообщений с агрегатором. - person Gary Russell; 06.12.2017