Как мне получить количество сообщений в очереди Spring Integration с помощью Micrometer?

Метрики Spring и управление: MessageChannel Метрические характеристики описывает:

Если это QueueChannel, вы также видите статистику для операции приема, а также количество сообщений, которые в данный момент буферизированы этим QueueChannel

Тем не мение:

Эти устаревшие метрики будут удалены в следующем выпуске. См. Интеграция микрометра < / а>.

Где это описывает:

Счетчики счетчиков для операций приема на опрашиваемых каналах сообщений имеют следующие имена или теги: name: spring.integration.receive [...]

Похоже, он учитывает только количество полученных сообщений. Количество сообщений в очереди кажется недоступным, даже если вычислить receive - send (потому что нет send).

Итак, с помощью Spring Integration и Micrometer можно ли вообще прочитать размер очереди? Как?


person Michel Jung    schedule 28.07.2020    source источник


Ответы (2)


Я думаю, нам нужно зарегистрировать Gauge для этого значения размера очереди:

/**
 * A gauge tracks a value that may go up or down. The value that is published for gauges is
 * an instantaneous sample of the gauge at publishing time.
 *
 * @author Jon Schneider
 */
public interface Gauge extends Meter {

А вот соответствующий строитель:

/**
 * A convenience method for building a gauge from a supplying function, holding a strong
 * reference to this function.
 *
 * @param name The gauge's name.
 * @param f    A function that yields a double value for the gauge.
 * @return A new gauge builder.
 * @since 1.1.0
 */
@Incubating(since = "1.1.0")
static Builder<Supplier<Number>> builder(String name, Supplier<Number> f) {

Не стесняйтесь поднимать вопрос о GH, чтобы улучшить ситуацию!

На данный момент это, вероятно, возможно через внешний Gauge экземпляр с QueueChannel.getQueueSize() делегированием.

person Artem Bilan    schedule 28.07.2020
comment
Вот исправление: github.com/spring-projects/spring-integration/pull / 3349 - person Artem Bilan; 28.07.2020
comment
Вы, ребята, очень быстрые и отзывчивые, очень признательны! :-) большое спасибо. - person Michel Jung; 28.07.2020
comment
Спасибо за подтверждение, что исправление верное! И спасибо за обзор! - person Artem Bilan; 28.07.2020

Пока не будет выпущена Spring Integration 5.4, это можно использовать:

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.micrometer.MicrometerMetricsCaptor;

import java.util.Collection;

@Configuration
@RequiredArgsConstructor
public class IntegrationMetricConfig {

  private final ApplicationContext applicationContext;
  private final Collection<QueueChannel> queues;

  /**
   * Temporary solution until <a href="https://github.com/spring-projects/spring-integration/pull/3349/files">Add
   * gauges for queue channel size</a> is released. Remove this when updating to Spring Integration
   * 5.4.
   */
  @Autowired
  public void queueSizeGauges() {
    MetricsCaptor metricsCaptor = MicrometerMetricsCaptor.loadCaptor(this.applicationContext);
    queues.forEach(queue -> {
      metricsCaptor.gaugeBuilder("spring.integration.channel.queue.size", queue,
        obj -> queue.getQueueSize())
        .tag("name", queue.getComponentName() == null ? "unknown" : queue.getComponentName())
        .tag("type", "channel")
        .description("The size of the queue channel")
        .build();

      metricsCaptor.gaugeBuilder("spring.integration.channel.queue.remaining.capacity", this,
        obj -> queue.getQueueSize())
        .tag("name", queue.getComponentName() == null ? "unknown" : queue.getComponentName())
        .tag("type", "channel")
        .description("The remaining capacity of the queue channel")
        .build();
    });
  }
}
person Michel Jung    schedule 29.07.2020