Использование очереди на основе количества потребителей в Spring AMQP

Я хочу, чтобы очередь использовалась только одним подписчиком одновременно. Таким образом, если один подписчик откажется, то у другого будет возможность подписаться.

Я ищу правильный способ сделать это в Spring AMQP. Я сделал это на чистой Java, основываясь на примере на сайте RabbitMQ. Я пассивно объявляю очередь, проверяю количество ее потребителей, если оно равно 0, то начинаю ее использовать.

Вот код.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

int count = channel.queueDeclarePassive(QUEUE_NAME).getConsumerCount();

System.out.println("count is "+count);
if (count == 0) {
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} else{
    System.out.println("subscribed by some other processor(s)");
}

Я также могу проверить количество подписчиков в Spring AMQP таким образом. Но уже поздно, потому что он уже слушает очередь.

@RabbitListener(queues = "q1")
public void receivedMessageQ1(String message, Channel channel){
    try {
        int q1 = channel.queueDeclarePassive("q1").getConsumerCount();
        // do something.
    } catch (IOException e) {
        System.out.println("exception occurred");
    }
}

Вкратце, я хочу использовать очередь в зависимости от количества ее потребителей. Надеюсь, я понял.


person Mansur Gulami    schedule 27.05.2019    source источник


Ответы (1)


Установите флаг exclusive на @RabbitListener; RabbitMQ позволит использовать только один экземпляр. Другой экземпляр (-и) будет пытаться прослушивать каждые 5 секунд (по умолчанию). Чтобы увеличить интервал, установите recoveryBackOff фабрики контейнеров.

@SpringBootApplication
public class So56319999Application {

    public static void main(String[] args) {
        SpringApplication.run(So56319999Application.class, args);
    }

    @RabbitListener(queues = "so56319999", exclusive = true)
    public void listen (String in) {

    }

    @Bean
    public Queue queue() {
        return new Queue("so56319999");
    }

}
person Gary Russell    schedule 28.05.2019
comment
Итак, если текущий экземпляр упадет, смогут ли остальные прослушивать? Имеет ли это отношение к эксклюзивным очередям? Поскольку эксклюзивные очереди удаляются при закрытии соединения. - person Mansur Gulami; 28.05.2019
comment
Когда активный потребитель умирает, другой экземпляр начинает потреблять (исключительно). Нет, эксклюзивные потребители не имеют ничего общего с эксклюзивными очередями. - person Gary Russell; 28.05.2019