Интеграция Spring: TaskExecutor и MaxConcurrentConsumers на AmqpInboundChannelAdapter

Приложение My Spring Integration потребляет сообщения от RabbitMQ, преобразует их в сообщение SOAP и выполняет запрос веб-службы.

Из очереди можно получать много (10-50) сообщений в секунду. Или после перезапуска приложения в очереди RabbitMQ может оказаться много тысяч сообщений.

Каков наилучший сценарий для обработки до 10 сообщений в параллельных потоках (упорядочение сообщений - это хорошо, но не является обязательной функцией, если веб-служба отвечает с ошибкой бизнеса, то сообщение о сбое должно быть отправлено для повторной попытки до тех пор, пока оно не будет успешным).

Прослушиватель Amqp не должен потреблять больше сообщений из очереди, поскольку незанятые потоки доступны в исполнителе задач. Я мог бы определить ThreadExecutor в канале следующим образом:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue);
}

IntegrationFlow integrationFlow = IntegrationFlows
  .from(amqpInboundChannelAdapter)
  .channel(c -> c.executor(exportFlowsExecutor))
  .transform(businessObjectToSoapRequestTransformer)
  .handle(webServiceOutboundGatewayFactory.getObject())
  .get();

Или достаточно определить исполнителя задачи в AmqpInboundChannelAdapter следующим образом и не определять исполнителя задачи каналов в определении потока:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.taskExecutor(taskExecutor));
}

Или, возможно, определить исполнителя задачи для канала, такого как вариант 1, но дополнительно установить maxConcurrentConsumers на адаптере канала следующим образом:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.maxConcurrentConsumers(10));
}

person Roman    schedule 28.08.2018    source источник


Ответы (1)


Лучше всего настроить concurrency на ListenerContainer и позволить всем последующим процессам выполняться в этих потоках из контейнера. Таким образом, вы получите естественное противодействие, когда из очереди больше не будут опрашиваться сообщения, потому что поток занят. И, с другой стороны, не будет потери сообщения только потому, что с ExecutorChannel после контейнера слушателя вы собираетесь освободить поток опроса, и текущее сообщение будет подтверждено как использованное, но вы можете потерпеть неудачу вниз по течению.

person Artem Bilan    schedule 28.08.2018
comment
Спасибо Артему за ответ. Знаете ли вы какой-нибудь URL-адрес с примером кода, который настраивает клиентское приложение rabbitmq, использующее более одной очереди с функциями параллелизма и повторных попыток? - person Roman; 29.08.2018
comment
Amqp.inboundAdapter() может принимать несколько очередей. Параллелизм - это вопрос configureContainer(). И повторить попытку можно с помощью adviceChain(Advice... adviceChain) в упомянутом configureContainer(). Как настроить и соответствующие рекомендации по повторной попытке, вы можете прочитать в документации: docs.spring.io/spring-amqp/docs/2.0.5.RELEASE/reference/html/ - person Artem Bilan; 29.08.2018