Встроенная фильтрация сообщений Grails RabbitMQ для потребителя

Я использую версию grails 3.2.3 и версию rabbitmq native plugin 3.3.2 (http://budjb.github.io/grails-rabbitmq-native/doc/manual/). Я пытаюсь реализовать следующий сценарий. введите описание изображения здесь
Описание: я отправляю несколько сообщений в одну очередь с заголовками, а в разделе потребителя я пытался применить привязку к потреблению сообщения с помощью специфическая фильтрация. Но потребитель потребляет все сообщения вне зависимости от фильтрации — значит, привязка не работает. Также я начинаю работу с rabbitmq. Поэтому любая помощь/направление очень ценится. Ниже мой код.

Конфигурация очереди в application.groovy:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
]

}

Функция отправки в очередь:

protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName
        body = message
        autoConvert = true
        if (headers != null) {
            headers = binding
        }
    }
}

Здесь, в sendToQueue, я сделал третий параметр необязательным, так как в некоторых случаях мне не нужны несколько типов потребителей;

Вызов отправки в очередь:

sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])

Потребитель 1:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetEmailConsumer consumer")
    println(message)
    passwordResetEmailService.sendPasswordResetMail(message)
}

Потребитель 2:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetSuccessEmailConsumer consumer")
    println(message)
    passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}

person Mamun Sardar    schedule 20.12.2016    source источник


Ответы (1)


Прочитав документацию по rabbitmq, я понял, что невозможно выборочно извлекать сообщения из одной очереди.

Потребитель получает все сообщения из очереди

Хотя есть и другой вариант "Exchange", когда издатель будет публиковать сообщения для обмена с ключом маршрутизации, и эти сообщения будут доставлены в связанные очереди. Подробнее: Модель публикации/подписки RabbitMQ
Основная идея заключается в следующем. также описано здесь: Stackoverflow: RabbitMQ выборочно извлекает сообщения из очереди
Во всяком случае, в моем решении я не хотел несколько очередей. Поэтому я создал одного потребителя и передал фактическую ссылку на bean-компонент класса обработчика с сообщением для отправки сообщения. Делюсь реализацией, надеюсь, это кому-то поможет:

Конфигурация очереди в application.groovy:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
    ]
}

Функция отправки в очередь:

protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
    message.queueHandlerServiceClass = queueHandlerServiceClass.name
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName // queue name from enum: "mail.queue"
        body = message
        autoConvert = true
    }
}

Интерфейс обработчика:

interface BaseQueueHandler {
    void handleMessage(Map message, MessageContext context)
}

Отправка в очередь:

sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)

Потребитель в очереди:

class EchoEmailQueueConsumer {

    static rabbitConfig = [
            queue   : QueueType.ECHO_EMAIL_QUEUE.queueName,
            consumer: 10
    ]

    GrailsApplication grailsApplication

    def handleMessage(Map message, MessageContext context) {
        String handlerClass = message.remove("queueHandlerServiceClass")
        Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
        BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
        queueService.handleMessage(message, context)
    }

}

Наконец, служба Handler, которая реализует интерфейс Handler:

class PasswordResetEmailService implements BaseQueueHandler {

    @Override
    void handleMessage(Map message, MessageContext context) {
        println("message received in PasswordResetEmailService")
    }
}
person Mamun Sardar    schedule 26.12.2016