Я могу подключиться к своему брокеру MQTT с привязками Python, но я пытаюсь воспроизвести то же самое с Java, и похоже, что есть успешное рукопожатие SSH, но после отправки сообщения подключения MQTT нет ответа. В чем может быть причина?
Мой код:
package com.daniel.qa.common.services.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import javax.net.ssl.*;
import java.net.Socket;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.logging.*;
public class MqttTest {
public static void setDebugLevel(Level newLvl) {
Logger rootLogger = LogManager.getLogManager().getLogger("");
rootLogger.addHandler(new ConsoleHandler());
Handler[] handlers = rootLogger.getHandlers();
rootLogger.setLevel(newLvl);
for (Handler h : handlers) {
h.setLevel(newLvl);
}
}
public static void main(String[] args) {
setDebugLevel(Level.FINEST);
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 1;
String broker = "ssl://super-car.na.mqtt.daniel.com:8883";
MemoryPersistence persistence = new MemoryPersistence();
try {
// Create a trust manager that does not validate certificate chains
TrustManager[] trustAllCerts = new TrustManager[]{
new X509ExtendedTrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {
}
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {
}
}
};
// Install the all-trusting trust manager
System.setProperty("jdk.tls.client.protocols", "TLSv1");
SSLContext sc = SSLContext.getInstance("TLS");
sc.init(null, trustAllCerts, new java.security.SecureRandom());
MqttClient sampleClient = new MqttClient(broker, "pahoClientId", persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setSocketFactory(sc.getSocketFactory());
System.out.println("Connecting to broker: " + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
e.printStackTrace();
}
}
}
Журналы на FINEST уровне:
11 января 2018 г. 17:16:31 org.eclipse.paho.client.mqttv3.internal.Token waitForResponse FINE: pahoClientId: сбой с исключением Соединение потеряно (32109) - исключение java.io.EOFException в org.eclipse.paho.client .mqttv3.internal.CommsReceiver.run (CommsReceiver.java:138) в java.lang.Thread.run (Thread.java:748) Вызвано: java.io.EOFException в java.io.DataInputStream.readByte (DataInputStream.java : 267) на org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage (MqttInputStream.java:56) на org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run (Comms100Receiver.java ) ... 1 больше
Соединение потеряно (32109) - исключение java.io.EOFException в org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run (CommsReceiver.java:138) в java.lang.Thread.run (Thread.java:748) Вызвано автор: java.io.EOFException в java.io.DataInputStream.readByte (DataInputStream.java:267) в org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage (MqttInputStream.java .:56) eclipse.paho.client.mqttv3.internal.CommsReceiver.run (CommsReceiver.java:100) ... еще 1
Процесс завершен с кодом выхода 0
Проблема заключалась в версии MQTT - в первой версии Java у меня была более старая библиотека - mqtt-client 0.4.0 - с MQTT 3.1, и брокер не поддерживал ее, поэтому он прерывал SSL-соединение, когда это заметил. Когда я переключился на более новую версию (org.eclipse.paho.client.mqttv3 1.0.2, с MQTT 3.1.1), соединение было успешным.