Как реализовать сервер MQTT с помощью Spring Integration?

Когда я запускаю пример адаптера исходящего канала для MQTT выдает ошибку:

Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.4.1.RELEASE)

2016-10-11 21:53:36.811  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound)
2016-10-11 21:53:36.816  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : No active profile set, falling back to default profiles: default
2016-10-11 21:53:36.960  INFO 2102 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:37.724  INFO 2102 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:37.729  INFO 2102 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2016-10-11 21:53:37.933  INFO 2102 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2016-10-11 21:53:37.947  INFO 2102 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2016-10-11 21:53:38.143  INFO 2102 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:38.148  INFO 2102 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.177  INFO 2102 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.592  INFO 2102 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService  'taskScheduler'
2016-10-11 21:53:39.064  INFO 2102 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2016-10-11 21:53:39.077  INFO 2102 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147483648
2016-10-11 21:53:39.078  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.078  INFO 2102 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.mqttOutboundChannel' has 1 subscriber(s).
2016-10-11 21:53:39.079  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.079  INFO 2102 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2016-10-11 21:53:39.079  INFO 2102 --- [           main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.079  INFO 2102 --- [           main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2016-10-11 21:53:39.093  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669)
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
    at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source)
    at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27)
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180)
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189)
    at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 19 more
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
    ... 2 more
2016-10-11 21:53:39.203  INFO 2102 --- [       Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:39.207  INFO 2102 --- [       Thread-1] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2016-10-11 21:53:39.209  INFO 2102 --- [       Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped _org.springframework.integration.errorLogger
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147483648
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.mqttOutboundChannel' has 0 subscriber(s).
2016-10-11 21:53:39.211  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.211  INFO 2102 --- [       Thread-1] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2016-10-11 21:53:39.212  INFO 2102 --- [       Thread-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
------------------------------------------------------------------------
BUILD FAILURE

Неудачный код в MqttJavaApplication.java скопирован ниже:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {
        void sendToMqtt(String data);
    }
}

и в любом случае это, похоже, клиент MQTT...

Как реализовать сервер MQTT (брокер) в качестве Bean для Java Spring Integration, с чего начать, пожалуйста?


person Alexander Farber    schedule 11.10.2016    source источник


Ответы (3)


Spring Integration не предоставляет брокера; он предоставляет клиентов для отправки/получения сообщений.

person Gary Russell    schedule 11.10.2016
comment
Гэри, Spring Integration предоставляет HTTP-сервер, поэтому я предполагал, что в конечном итоге будет возможно создание брокера MQTT? - person Alexander Farber; 12.10.2016
comment
Spring Integration не также предоставляет HTTP-сервер; он может работать в веб-контейнере, таком как Tomcat, и таким образом предоставляет конечные точки HTTP. - person Gary Russell; 12.10.2016
comment
Являются ли эти конечные точки HTTP клиентами или они могут быть сервером? (Наверное, очень глупый вопрос от меня) - person Alexander Farber; 12.10.2016
comment
Конечные точки сервера (входящий шлюз, адаптер входящего канала) доступны при работе в веб-контейнере; клиентские конечные точки доступны в любом месте. - person Gary Russell; 12.10.2016

В верхней части сообщения об ошибке говорится:

java.net.ConnectException: Connection refused

Это означает, что на хосте (localhost), который вы указали в качестве брокера, либо не запущен брокер, либо включен брандмауэр, препятствующий подключению.

Маловероятно, что брандмауэр будет блокировать соединения с локальным хостом, поэтому вам нужно проверить, действительно ли работает брокер.

Если у вас нет брокера, вам необходимо установить его, самый простой из них — Mosquitto, доступный здесь:

https://mosquitto.org/download/

Доступны и другие брокеры, список которых можно найти здесь:

https://github.com/mqtt/mqtt.github.io/wiki/servers

person hardillb    schedule 11.10.2016
comment
>How to implement an MQTT server Spring Integration не предоставляет функции брокера; это клиент. - person Gary Russell; 11.10.2016

Вы можете создать bean-компонент MqttPahoMessageHandler в основном классе Spring Boot Application.java.

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] { mqttServerUrl });
    options.setUserName(mqttUsername);
    options.setPassword(mqttPassword.toCharArray());
    factory.setConnectionOptions(options);
    return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MqttPahoMessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
    messageHandler.setAsync(true);
    return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

Затем bean-компоненты будут автоматически создаваться при запуске приложения.

Затем для публикации сообщений вы можете реализовать свой собственный сервисный слой по своему усмотрению.

person Akila Wickramarachchi    schedule 10.10.2019