Paho MQTT cleanSession имеет значение false, но сообщения не принимаются

Я тестировал MQTT для проекта. Я также могу получать сообщения по теме, на которую подписался мой клиент, когда он подключен. Я установил QoS на 1, а cleanSession установлен на false. Но я не могу получать сообщения, которые были отправлены в тему, на которую подписан, при повторном подключении моего клиента. В моем приложении почти всю работу выполняет вспомогательный сервис.

Вот мой код

AndroidManifest.xml

<?xml version="1.0" encoding="utf-8"?>

<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

<application
    android:allowBackup="true"
    android:icon="@mipmap/ic_launcher"
    android:label="@string/app_name"
    android:theme="@style/AppTheme" >
    <activity
        android:name=".MainActivity"
        android:label="@string/app_name"
        android:screenOrientation="portrait" >
        <intent-filter>
            <action android:name="android.intent.action.MAIN" />

            <category android:name="android.intent.category.LAUNCHER" />
        </intent-filter>
    </activity>

    <service
        android:name=".MqttHelperService"
        android:enabled="true"
        android:exported="true" />

    <!-- MqttService -->
    <service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

MainActivity.java

package com.prateek.mqtttest;

import android.app.Activity;
import android.content.Intent;
import android.os.Bundle;

public class MainActivity extends Activity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        startService(new Intent(getBaseContext(), MqttHelperService.class));
    }
}

MqttHelperService.java

package com.prateek.mqtttest;

import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.widget.Toast;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttHelperService extends Service implements MqttCallback {

    private static final String MQTT_URI = "tcp://broker.mqttdashboard.com:1883";
    private static final String CLIENT_ID = "prateek";
    private static final String MQTT_TOPIC = "mqttmessenger";
    private static final int QOS = 1;
    private MqttAndroidClient client;

    public MqttHelperService() {
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Toast.makeText(this, "MQTT Helper Service Started", Toast.LENGTH_SHORT).show();
        new Thread(new Runnable() {
            @Override
            public void run() {
                connect();
            }
        }, "MqttHelperService").start();
        return START_STICKY;
    }

    public class MqttHelperBinder extends Binder {
        public MqttHelperService getService(){
            return MqttHelperService.this;
        }
    }

    public void connect() {
        client = new MqttAndroidClient(this, MQTT_URI, CLIENT_ID);
        client.setCallback(this);

        try {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(false);
            client.connect(options, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    Toast.makeText(getBaseContext(), "connected to MQTT broker", Toast.LENGTH_SHORT).show();
                    subscribe();
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    Toast.makeText(getBaseContext(), "failed to connect: " + throwable.getMessage(), Toast.LENGTH_SHORT).show();
                }
            });
        } catch (MqttException e) {
            Toast.makeText(this, "could not connect to MQTT broker at " + MQTT_URI, Toast.LENGTH_SHORT).show();
        }
    }

    public void subscribe() {
        try {
            IMqttToken token = client.subscribe(MQTT_TOPIC, QOS, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    Toast.makeText(getBaseContext(), "subscription successful", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    Toast.makeText(getBaseContext(), "subscription failed: " + throwable, Toast.LENGTH_SHORT).show();
                }
            });

        } catch (MqttException e) {
            Toast.makeText(this, "could not subscribe", Toast.LENGTH_SHORT).show();
        }
    }

    @Override
    public IBinder onBind(Intent intent) {
        // TODO: Return the communication channel to the service.
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void connectionLost(Throwable throwable) {
        Toast.makeText(this, "connection lost", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        Toast.makeText(this, "message received on topic " + s, Toast.LENGTH_SHORT).show();
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        Toast.makeText(this, "Service Destroyed", Toast.LENGTH_SHORT).show();
    }


}

Я даже проверил эту ссылку Если для флага очистки сеанса установлено значение FALSE, мне не хватает опубликованных значений, но я не могу найти ошибку в моем коде


person Prateek Garg    schedule 25.06.2015    source источник


Ответы (2)


Недавно у меня была такая же проблема. Сейчас. Я думаю, что решение очень простое, но я потратил много часов, чтобы разобраться в нем.

Эта строка была «плохой»:

client.connect(mqttOptions, mqqtActionListener);

«Правильный»:

client.connect(mqttOptions, null, mqqtActionListener);

Если вы вызываете метод connect с двумя параметрами, вы используете этот конструктор:

public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException

Вместо правильного:

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException

Я надеюсь, что это твоя проблема.

person wyzard    schedule 30.07.2015
comment
пожалуйста, помогите мне. stackoverflow.com/questions/36056381/ - person séan35; 17.03.2016
comment
Большое спасибо, я провел очень много часов, осматриваясь, читая, переписывая свой класс обслуживания, меняя брокеров, в конце концов, это была проблема. - person user3564573; 17.05.2016
comment
Я рада, что могу помочь. :) - person wyzard; 03.10.2016

Наиболее вероятная проблема заключается в том, что публикуемые сообщения имеют QoS = 0. И подписка, и публикация должны иметь QoS> 0, чтобы они были поставлены в очередь для постоянного клиента.

Вы подтвердили, что сообщения публикуются с QoS 1, поэтому следующее, что нужно сделать, это протестировать с помощью известных рабочих инструментов, чтобы устранить возможные проблемы с вашим кодом. Я пытался:

mosquitto_sub -i prateek -t mqttmessenger -h broker.mqttdashboard.com -v -d -c -q 1

И подтвердил, что могу получать сообщения, пока он подключен:

mosquitto_pub -q 1 -t mqttmessenger -m hello2 -h broker.mqttdashboard.com

Затем я отключил клиент mosquitto_sub и опубликовал другое сообщение с той же командой mosquitto_pub. Повторное подключение mosquitto_sub той же командой не дало никаких сообщений, в точности как вы видите. Повторяя процедуру, но используя test.mosquitto.org в качестве брокера, поведение соответствует ожидаемому. Похоже, broker.mqttdashboard.com не настроен для поддержки постоянных клиентов.

person ralight    schedule 25.06.2015
comment
Сообщения также публикуются с QoS = 1. - person Prateek Garg; 25.06.2015
comment
Это тоже была моя проблема, тонкость, что сообщения должны быть опубликованы как QoS ›= 1, чтобы подписка QoS› = 1 работала. - person RedShift; 20.09.2019