Весеннее сообщение о приоритете AMQP

Приоритеты сообщений в очередях в RabbitMQ. Он работает с Java-клиентом, предоставленным rabbitmq. Но это не работает с зависимостью spring-rabbit. Пожалуйста, посмотрите.

  • Версия сервера RabbitMQ - 3.6.5
  • Эрланг - OTP 19 (8.0)

Использование Java-клиента RabbitMQ
Pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.springframework.samples</groupId>
    <artifactId>RabbitMQ</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <developers>
        <developer>
            <name>Sagar Rout</name>
        </developer>
    </developers>

    <properties>
        <!-- Generic properties -->
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Spring -->
        <spring-framework.version>4.3.2.RELEASE</spring-framework.version>
    </properties>

    <dependencies>
        <!-- Spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>

        <!-- Spring AMQP -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.6.1.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>  

Publisher.java

public class Publisher {

private final static String QUEUE_NAME = "S1_Priority";

public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    channel.queueDeclare(QUEUE_NAME, false, false, false, args);
    String message = "Hello World!";

    for (int i = 0; i < 10; i++) {
        channel.basicPublish("", QUEUE_NAME,
                new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(),
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'" + "priority" + i);
    }
    channel.close();
    connection.close();
}}  

Consumer.Java

public class Consumer {

private final static String QUEUE_NAME = "S1_Priority";

public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    channel.queueDeclare(QUEUE_NAME, false, false, false, args);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DefaultConsumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'" + properties.getPriority());
        }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
}}  

Это работает, и появляется сообщение с более высоким приоритетом. Но с Spring-rabbit это не работает. Найдите код.
RabbitMQConfig.class

@Configuration
@ComponentScan(basePackages = { "com.blackocean.*" })
@PropertySource("classpath:config.properties")
public class RabbitMQConfig {

@Value("${rabbitmq.host}")
private String host;

@Value("${rabbitmq.port}")
private Integer port;

@Value("${rabbitmq.username}")
private String username;

@Value("${rabbitmq.password}")
private String password;

@Value("${rabbitmq.connection.size}")
private Integer connectionSize ;

@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
    return new PropertySourcesPlaceholderConfigurer();
}

@Bean
public ConnectionFactory connectionFactory() {

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.setHost(host);
    cachingConnectionFactory.setPort(port);
    cachingConnectionFactory.setUsername(username);
    cachingConnectionFactory.setPassword(password);
    cachingConnectionFactory.setConnectionLimit(connectionSize);

    return cachingConnectionFactory;
}

@Bean
public RabbitAdmin rabbitAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-priority", 10);
    Queue queue = new Queue("myQueue", true, false, false, args) ; 
    return queue ;
}}  

SendUsingJavaConfig

public class Send1UsingJavaConfig {

/**
 * @param args
 */
public static void main(String[] args) {

    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                 message.getMessageProperties().setPriority(9);
                return message;
            }
        });
    }
}  

ReceiveusingJavaConfig

public class RecvUsingJavaConfig {

public static void main(String[] args) {
    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

    // Basic Example
    String message = (String) rabbitTemplate.receiveAndConvert("myQueue");
    System.out.println(message);
}}  

Config.properties

#RabbitMQ
rabbitmq.host=localhost
#Always provide port and connection size in numbers
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.connection.size=100

Теперь я отправляю сообщение с другим приоритетом, но оно всегда получает сообщение в указанном порядке. Любое предложение будет отличным !!!


person Sagar Rout    schedule 11.09.2016    source источник


Ответы (3)


Просто догадываюсь, я попытался заглянуть в старую библиотеку AMQP, которую использовал (очередь приоритетов в более старой версии Rabbit MQ).

Приоритет был установлен, как показано ниже

args.put("x-max-priority", 10);, он немного отличается от args.put("x-priority", 10);.

Вы можете сослаться на старый репозиторий очереди приоритетов по ссылке. Вы можете попробовать посмотреть, помогает ли это

person Ramachandran.A.G    schedule 11.09.2016
comment
Да; в собственном коде у вас есть args.put("x-max-priority", 10);, но в коде Spring у вас неправильно args.put("x-priority", 10);. - person Gary Russell; 11.09.2016
comment
Спасибо вам обоим. Вчера попробовал x-max-priority, ничего не вышло. Сегодня, когда я поменял, теперь работает. Я не знаю, чего не хватало в то время. Спасибо чувак. :) - person Sagar Rout; 12.09.2016
comment
@Sagar Rout: это изменение будет работать для новых очередей (созданных заново), и x-max-priority не может применяться к очередям, которые уже созданы. - person Santhosh_Reddy; 10.09.2018

  1. Очередь должна иметь аргумент «x-max-priority».
  2. При публикации messageProperties.priority не должно быть 0.
  3. При использовании spring -boot amqp важно установить

    spring.rabbitmq.listener.simple.prefetch=1

В противном случае spring -boot получает 250 сообщений, абсолютно игнорируя приоритеты.

person Dmytro Verbivskyi    schedule 19.07.2019

Если у кого-то есть аналогичные требования к приоритету сообщений, вам необходимо определить приоритет (класс конфигурации) до создания очереди. Если вы планируете применить конфиг для существующих очередей, это не сработает (из моего тестирования).

@Value("${myApp.rabbitmq.queue}")
private String queueName;

@Bean
Queue queue(){
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    Queue queue = new Queue(queueName, true, false, false, args) ;
    return queue ;
}

Когда вы помещаете сообщения в очередь, убедитесь, что приоритет не превышает 10, поскольку мы определили максимальный приоритет очереди как 10.

BR, Santhosh

person Santhosh_Reddy    schedule 10.09.2018