Потребитель не получает сообщение от MQ, когда сообщение отправляется до того, как потребитель прослушивает

Я впервые использую MQ и пытаюсь реализовать систему ведения журнала с помощью RabbitMQ. Моя реализация включает в себя «отправителя»

/*
 * This class sends messages over MQ
 */
public class MQSender {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String[] levels = {"green", "orange", "red", "black"};
        for (String log_level : levels) {
            String message = "This is a " + log_level + " message";
            System.out.println("Sending " + log_level + " message");
            //publish the message with each of the bindings in levels
            channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
        }

        channel.close();
        connection.close();
    }
}

Который отправляет по одному сообщению для каждого моего цвета на биржу, где цвет будет использоваться в качестве привязки. И это включает в себя «приемник»

public class MQReceiver {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        receiveMessagesFromQueue(2);
    }

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //generate random queue
        String queueName = channel.queueDeclare().getQueue();

        //set bindings from 0 to maxLevel for the queue
        for (int level = 0; level <= maxLevel; level++) {
            channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
        }

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while(true) {
            //waits until a message is delivered then gets that message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

которому дается в качестве параметра число, представляющее, какие цветовые привязки я хотел бы, чтобы он подавался с биржи.

В моей реализации и в RabbitMQ в целом кажется, что сообщения хранятся в обмене до тех пор, пока Consumer не запросит их, после чего они распределяются по соответствующим очередям, а затем отправляются по одному клиенту (или потребителю в жаргон MQ). Моя проблема в том, что когда я запускаю класс MQSender перед запуском класса MQReceiver, сообщения никогда не доставляются. Но когда я сначала запускаю класс MQReceiver, сообщения принимаются. Исходя из моего понимания MQ, я думаю, что сообщения должны храниться на сервере до тех пор, пока не будет запущен класс MQReceiver, а затем сообщения должны быть доставлены их потребителям, однако это не то, что происходит. Мой главный вопрос заключается в том, могут ли эти сообщения храниться в обменнике, и если нет, то где они должны храниться, чтобы они были доставлены после вызова потребителя (т. е. моего класса MQReceiver)?

Спасибо за вашу помощь!


person azrosen92    schedule 22.08.2013    source источник
comment
Просто предположение, но я подозреваю, что ваш Sender отбрасывает сообщение из-за отсутствия зарегистрированных Consumer   -  person StormeHawke    schedule 23.08.2013
comment
у вас может быть autoAck установлено в true? больше информации здесь: rabbitmq.com/tutorials/tutorial-two-java.html< /а>   -  person Alper Akture    schedule 23.08.2013


Ответы (1)


RabbitMQ отбрасывает сообщения, если их ключ маршрутизации не соответствует ни одной из очередей, привязанных к обмену. Когда вы сначала запускаете MQSender, никакие очереди не связаны, поэтому отправляемые сообщения теряются. Когда вы запускаете MQReceiver, он привязывает очереди к обмену, поэтому у RabbitMQ есть место для размещения сообщения от MQSender. Когда вы останавливаете MQReceiver, поскольку вы создали анонимную очередь, очередь и все привязки удаляются из обмена.

Если вы хотите, чтобы сообщения хранились на сервере, пока MQReceiver не работает, вам нужно, чтобы получатель создал именованную очередь и привязал ключи маршрутизации к этой очереди. Обратите внимание, что создание именованной очереди является идемпотентным, и очередь не будет создана, если она уже существует. Затем вам нужно, чтобы получатель вытягивал сообщения из именованной очереди.

Измените свой код, чтобы он выглядел примерно так:

MQSender

....
String namedQueue = "logqueue";
//declare named queue and bind log level routing keys to it.
//RabbitMQ will put messages with matching routing keys in this queue
channel.queueDeclare(namedQueue, false, false, false, null);
for (int level = 0; level < LOG_LEVELS.length; level++) {
   channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]);
}
...

MQReceiver

...
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

QueueingConsumer consumer = new QueueingConsumer(channel);

//Consume messages off named queue instead of anonymous queue
String namedQueue = "logqueue";
channel.basicConsume(namedQueue, true, consumer);

while(true) {
...
person lreeder    schedule 23.08.2013