Приложение My Spring Integration потребляет сообщения от RabbitMQ, преобразует их в сообщение SOAP и выполняет запрос веб-службы.
Из очереди можно получать много (10-50) сообщений в секунду. Или после перезапуска приложения в очереди RabbitMQ может оказаться много тысяч сообщений.
Каков наилучший сценарий для обработки до 10 сообщений в параллельных потоках (упорядочение сообщений - это хорошо, но не является обязательной функцией, если веб-служба отвечает с ошибкой бизнеса, то сообщение о сбое должно быть отправлено для повторной попытки до тех пор, пока оно не будет успешным).
Прослушиватель Amqp не должен потреблять больше сообщений из очереди, поскольку незанятые потоки доступны в исполнителе задач. Я мог бы определить ThreadExecutor в канале следующим образом:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue);
}
IntegrationFlow integrationFlow = IntegrationFlows
.from(amqpInboundChannelAdapter)
.channel(c -> c.executor(exportFlowsExecutor))
.transform(businessObjectToSoapRequestTransformer)
.handle(webServiceOutboundGatewayFactory.getObject())
.get();
Или достаточно определить исполнителя задачи в AmqpInboundChannelAdapter следующим образом и не определять исполнителя задачи каналов в определении потока:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.taskExecutor(taskExecutor));
}
Или, возможно, определить исполнителя задачи для канала, такого как вариант 1, но дополнительно установить maxConcurrentConsumers на адаптере канала следующим образом:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.maxConcurrentConsumers(10));
}