Как я могу передать объект «Сообщение» на маршрут?

Создаю поток, который потребляет сообщения от RabbitMQ и после этого раздает на соответствующие сервисы по типам с помощью роутера. Методы в сервисах принимают аргумент Message<?>, потому что там мне нужно использовать заголовки. Но в этом методе я получаю только полезную нагрузку сообщения с типом java.lang.String вместо org.springframework.messaging.Message и получаю ошибку java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message.

Полезная нагрузка мне не подходит, так как мне нужно получить заголовки из сообщения.

@Bean
public IntegrationFlow testFlow(String queueName,
                                ConnectionFactory connectionFactory,
                                Service1 service1,
                                Service2 service2) {
    SimpleMessageListenerContainer consumerListener = new SimpleMessageListenerContainer(connectionFactory);
    consumerListener.addQueueNames(queueName);
    return IntegrationFlows.from(Amqp.inboundAdapter(consumerListener))
            .transform(s -> s, ConsumerEndpointSpec::transactional)
            .<Message<?>, String>route(HeadersUtil::getType, m -> m
                    .subFlowMapping(Type.SERVICE_1, sf -> sf.handle(service1::handleProcedure))
                    .subFlowMapping(Type.SERVICE_2, sf -> sf.handle(service2::handleProcedure)))
            .get();
}

Сигнатура метода handleProcedure выглядит следующим образом:

void handleProcedure(Message<?> message)

Я ожидаю получить заголовки Message в методе handleProcedure, но теперь получаю исключение.


person Roman    schedule 05.04.2019    source источник


Ответы (2)


Я думаю, вы не правильно поняли трассировку стека.

Ваш void handleProcedure(Message<?> message) и его ссылка на метод service1::handleProcedure полностью соответствуют сигнатуре метода public B handle(MessageHandler messageHandler) { в файле IntegrationFlowDefinition.

Ваша проблема здесь:

.<Message<?>, String>route(HeadersUtil::getType, 

Ваш HeadersUtil::getType ожидает сообщения, но тип лямбда-вызова — payload, который в вашем случае равен String.

Это должно работать:

.<Message<?>, String>route(Message.class, HeadersUtil::getType, 
person Artem Bilan    schedule 05.04.2019

Версия handle(GenericHandler<P>) .handle, которую вы используете, получает только полезную нагрузку.

Если вы хотите получить полное сообщение, вам нужно использовать другой перегруженный .handle, например handle("service1", "handleProcedure") или .handle(service1, "handleProcedure").

person Gary Russell    schedule 05.04.2019