Я впервые использую MQ и пытаюсь реализовать систему ведения журнала с помощью RabbitMQ. Моя реализация включает в себя «отправителя»
/*
* This class sends messages over MQ
*/
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
}
channel.close();
connection.close();
}
}
Который отправляет по одному сообщению для каждого моего цвета на биржу, где цвет будет использоваться в качестве привязки. И это включает в себя «приемник»
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
receiveMessagesFromQueue(2);
}
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
которому дается в качестве параметра число, представляющее, какие цветовые привязки я хотел бы, чтобы он подавался с биржи.
В моей реализации и в RabbitMQ в целом кажется, что сообщения хранятся в обмене до тех пор, пока Consumer
не запросит их, после чего они распределяются по соответствующим очередям, а затем отправляются по одному клиенту (или потребителю в жаргон MQ). Моя проблема в том, что когда я запускаю класс MQSender
перед запуском класса MQReceiver
, сообщения никогда не доставляются. Но когда я сначала запускаю класс MQReceiver
, сообщения принимаются. Исходя из моего понимания MQ, я думаю, что сообщения должны храниться на сервере до тех пор, пока не будет запущен класс MQReceiver
, а затем сообщения должны быть доставлены их потребителям, однако это не то, что происходит. Мой главный вопрос заключается в том, могут ли эти сообщения храниться в обменнике, и если нет, то где они должны храниться, чтобы они были доставлены после вызова потребителя (т. е. моего класса MQReceiver
)?
Спасибо за вашу помощь!
Sender
отбрасывает сообщение из-за отсутствия зарегистрированныхConsumer
- person StormeHawke   schedule 23.08.2013