Получение текущего количества сообщений в кольцевом буфере

Я использую шаблон Spring Reactor в своем веб-приложении. Внутри он использует реализацию RingBuffer LMAX в качестве одной из очередей сообщений. Мне было интересно, есть ли способ узнать текущую занятость RingBuffer динамически. Это помогло бы мне определить количество необходимых производителей и потребителей (а также их относительные скорости) и оптимально ли использовать RingBuffer в качестве очереди сообщений. Я попробовал getBacklog () класса response.event.dispatch.AbstractSingleThreadDispatcher, но, похоже, он всегда дает одно и то же значение: размер RingBuffer, который я использовал при создании экземпляра реактор.
Мы будем благодарны за любой свет по проблеме.


person mahasamatman    schedule 16.10.2014    source источник


Ответы (2)


Используйте com.lmax.disruptor.Sequencer.remainingCapacity () Чтобы получить доступ к экземпляру Sequencer, вы должны создать его явно, а также RingBuffer .

В моем случае инициализация выходящего Disruptor

Disruptor<MessageEvent> outcomingDisruptor = 
    new Disruptor<MessageEvent>(
        MyEventFactory.getInstance(),
        RING_BUFFER_OUT_SIZE, 
        MyExecutor.getInstance(), 
        ProducerType.SINGLE, new BlockingWaitStrategy());

превращается в

this.sequencer = 
    SingleProducerSequencer(RING_BUFFER_OUT_SIZE, new BlockingWaitStrategy());
RingBuffer ringBuffer = 
    new RingBuffer<MessageEvent>(MyEventFactory.getInstance(), sequencer);
Disruptor<MessageEvent> outcomingDisruptor = 
    new Disruptor<MessageEvent>(ringBuffer, MyExecutor.getInstance());

а потом

this.getOutCapacity() {
    return sequencer.remainingCapacity();
}

ОБНОВИТЬ

небольшая ошибка: | нам нужен outMessagesCount вместо getOutCapacity.

public long outMessagesCount() {
    return RING_BUFFER_OUT_SIZE - sequencer.remainingCapacity();
}
person VMykyt    schedule 16.10.2014
comment
но что, если вам нужно следить за нарушителем remainingCapacity внутри другого фреймворка, где sequencer не виден? например проблема связана с асинхронными регистраторами log4j2, основанными на дезинтеграторе: насколько я исследовал, нет действительного способа получить sequencer туда. - person yetanothercoder; 06.03.2015
comment
Похоже, один из вариантов - просто использовать общедоступный ringBuffer.remainingCapacity (). - person Dan Torrey; 06.02.2019
comment
@yetanothercoder есть ли способ в log4j2 выставить оставшуюся емкость как bean-компонент? - person Gaurav; 22.07.2019

Последняя версия активной зоны реактора в mvnrepository (версия 1.1.4 RELEASE) не имеет возможности динамически отслеживать состояние очереди сообщений. Однако, просмотрев код реактора на github, я обнаружил TraceableDelegatingDispatcher , который позволяет отслеживать очередь сообщений (если поддерживается базовой реализацией диспетчера) во время выполнения с помощью метода remainingSlots(). Самым простым вариантом было скомпилировать исходный код и использовать его.

person mahasamatman    schedule 17.10.2014