Android Mosquitto - состояние подключения клиента

У меня есть клиент MQTT, работающий на Android, и брокер MQTT на сервере. Моя проблема в том, где мы будем использовать приложение, у которого пропадают некоторые соединения, поэтому моему веб-приложению необходимо знать текущее состояние клиента.

Итак, что мы делаем прямо сейчас: 1 - Сервер отправляет случайное число клиентам (каждый клиент получит другое случайное число) 2 - Клиент Android получает номер и отправляет его веб-сервису 3 - Веб-сервис пишет в SQL db 4 - сервер ждет 4 секунды ответа от клиента Android, и если случайное число, отправленное сервером, == числу в базе данных, клиент подключен.

Но теперь проблема в том, что когда многопользовательские пользователи отправляют случайное число, единственное, которое будет записано в базу данных, является последним, поэтому это огромная ошибка дизайна.

Чтобы исправить, единственное хорошее решение - получить прямой ответ от клиента MQTT и быть уникальным для каждого клиента, но я не знаю, возможно ли это или это лучший способ.

Некоторые рисуют, чтобы лучше понять: Flow

Вот мой код для Android:

public class MQTTService extends Service implements MqttCallback {

public static final String DEBUG_TAG = "MqttService"; // Debug TAG

private static final String MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]"; // Handler
                                                                                    // Thread
                                                                                    // ID

private String MQTT_BROKER = ""; // Broker URL
                                    // or IP
                                    // Address
private static final int MQTT_PORT = 1883; // Broker Port

public static final int MQTT_QOS_0 = 0; // QOS Level 0 ( Delivery Once no
                                        // confirmation )
public static final int MQTT_QOS_1 = 1; // QOS Level 1 ( Delivery at least
                                        // Once with confirmation )
public static final int MQTT_QOS_2 = 2; // QOS Level 2 ( Delivery only once
                                        // with confirmation with handshake
                                        // )

private static final int MQTT_KEEP_ALIVE = 30000; // KeepAlive Interval in
                                                    // MS
private static final String MQTT_KEEP_ALIVE_TOPIC_FORMAT = "/users/%s/keepalive"; // Topic
                                                                                    // format
                                                                                    // for
                                                                                    // KeepAlives
private static final byte[] MQTT_KEEP_ALIVE_MESSAGE = { 0 }; // Keep Alive
                                                                // message
                                                                // to send
private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_2; // Default
                                                            // Keepalive QOS

private static final boolean MQTT_CLEAN_SESSION = false; // Start a clean
                                                            // session?

private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; // URL Format
                                                                // normally
                                                                // don't
                                                                // change

public static final String ACTION_START = DEBUG_TAG + ".START"; // Action
                                                                // to
                                                                // start
public static final String ACTION_STOP = DEBUG_TAG + ".STOP"; // Action to
                                                                // stop
public static final String ACTION_KEEPALIVE = DEBUG_TAG + ".KEEPALIVE"; // Action
                                                                        // to
                                                                        // keep
                                                                        // alive
                                                                        // used
                                                                        // by
                                                                        // alarm
                                                                        // manager
private static final String ACTION_RECONNECT = DEBUG_TAG + ".RECONNECT"; // Action
                                                                            // to
                                                                            // reconnect

// private final String DEVICE_ID_FORMAT = "andr_%s"; // Device ID
// Format, add
// any prefix
// you'd like
// Note: There
// is a 23
// character
// limit you
// will get
// An NPE if you
// go over that
// limit
private boolean mStarted = false; // Is the Client started?
private String user_ID; // Device ID, Secure.ANDROID_ID
private Handler mConnHandler; // Seperate Handler thread for networking

private MqttDefaultFilePersistence mDataStore; // Defaults to FileStore
private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore
private MqttConnectOptions mOpts; // Connection Options

private MqttTopic mKeepAliveTopic; // Instance Variable for Keepalive topic

private MqttClient mClient; // Mqtt Client

private AlarmManager mAlarmManager; // Alarm manager to perform repeating
                                    // tasks
private ConnectivityManager mConnectivityManager; // To check for
                                                    // connectivity changes

public static final String TAG_CONNECTED = "CONNECTED";
public static final String TAG_ASSIGNED = "ASSIGNED";
public static final String TAG_REFRESH = "REFRESH";

public String TOPIC_CONNECTED = null;
public String TOPIC_ASSIGNED = null;
public String TOPIC_REFRESH = null;

private Intent intent;

private PendingIntent alarmIntent;

private AppMaintenance appStatus;

/**
 * Initializes the DeviceId and most instance variables Including the
 * Connection Handler, Datastore, Alarm Manager and ConnectivityManager.
 */
@Override
public void onCreate() {
    super.onCreate();

    // mDeviceId = String.format(DEVICE_ID_FORMAT,
    // Secure.getString(getContentResolver(), Secure.ANDROID_ID));

    android.os.Debug.waitForDebugger(); // Debugger
    appStatus = (AppMaintenance) getApplicationContext();

    ExceptionHandler.register(this, appStatus.getException_URL());

    HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);
    thread.start();

    mConnHandler = new Handler(thread.getLooper());

    try {
        mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());
    } catch (Exception e) {
        // writeToFile("Exception - onCreate()");
        e.printStackTrace();
        mDataStore = null;
        mMemStore = new MemoryPersistence();
    }

    mOpts = new MqttConnectOptions();
    mOpts.setCleanSession(MQTT_CLEAN_SESSION);
    // Do not set keep alive interval on mOpts we keep track of it with
    // alarm's

    mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);
    mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    registerReceiver(mConnectivityReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}

/**
 * Start MQTT Client
 * 
 * @param Context
 *            context to start the service with
 * @return void
 */
public static void actionStart(Context ctx) {
    Intent i = new Intent(ctx, MQTTService.class);
    i.setAction(ACTION_START);
    ctx.startService(i);
}

/**
 * Stop MQTT Client
 * 
 * @param Context
 *            context to start the service with
 * @return void
 */
public static void actionStop(Context ctx) {
    Intent i = new Intent(ctx, MQTTService.class);
    i.setAction(ACTION_STOP);
    ctx.startService(i);
}

/**
 * Send a KeepAlive Message
 * 
 * @param Context
 *            context to start the service with
 * @return void
 */
public static void actionKeepalive(Context ctx) {
    Intent i = new Intent(ctx, MQTTService.class);
    i.setAction(ACTION_KEEPALIVE);
    ctx.startService(i);
}

/**
 * Service onStartCommand Handles the action passed via the Intent
 * 
 * @return START_REDELIVER_INTENT
 */
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    super.onStartCommand(intent, flags, startId);

    this.intent = intent;

    SharedPreferences myPrefs = getSharedPreferences("UserPreferences", MODE_PRIVATE);
    MQTT_BROKER = myPrefs.getString("broker", "");
    user_ID = myPrefs.getString("userID", "");

    String action = intent.getAction();

    TOPIC_CONNECTED = user_ID + "\\" + TAG_CONNECTED;
    TOPIC_ASSIGNED = user_ID + "\\" + TAG_ASSIGNED;
    TOPIC_REFRESH = user_ID + "\\" + TAG_REFRESH;

    Log.i(DEBUG_TAG, "Received action of " + action);
    // writeToFile("Received action of " + action);
    if (user_ID.isEmpty() || user_ID == null)
        action = null;

    if (action == null) {
        Log.i(DEBUG_TAG, "Starting service with no action\n Probably from a crash");
        // writeToFile("Starting service with no action\n Probably from a crash");
        Toast.makeText(getApplicationContext(), getString(R.string.mqtt_warning_userid), Toast.LENGTH_LONG).show();
        action = null;
    } else {
        if (action.equals(ACTION_START)) {
            Log.i(DEBUG_TAG, "Received ACTION_START");
            // writeToFile("Received ACTION_START");
            start();
        } else if (action.equals(ACTION_STOP)) {
            Log.i(DEBUG_TAG, "Received ACTION_STOP");
            // writeToFile("Received ACTION_STOP");
            stop();
        } else if (action.equals(ACTION_KEEPALIVE)) {
            Log.i(DEBUG_TAG, "Received ACTION_KEEPALIVE");
            // writeToFile("Received ACTION_KEEPALIVE");
            keepAlive();
        } else if (action.equals(ACTION_RECONNECT)) {
            Log.i(DEBUG_TAG, "Received ACTION_RECONNECT");
            // writeToFile("Received ACTION_RECONNECT");
            reconnectIfNecessary();
        }
    }

    return START_NOT_STICKY;
}

/**
 * Attempts connect to the Mqtt Broker and listen for Connectivity changes
 * via ConnectivityManager.CONNECTVITIY_ACTION BroadcastReceiver
 */
private synchronized void start() {
    if (mStarted) {
        Log.i(DEBUG_TAG, "Attempt to start while already started");
        // writeToFile("Attempt to start while already started");
        return;
    }

    if (hasScheduledKeepAlives()) {
        stopKeepAlives();
    }

    connect();
}

/**
 * Attempts to stop the Mqtt client as well as halting all keep alive
 * messages queued in the alarm manager
 */
private synchronized void stop() {
    if (!mStarted) {
        Log.i(DEBUG_TAG, "Attemt to stop connection that isn't running");
        // writeToFile("Attemt to stop connection that isn't running");
        return;
    }

    if (mClient != null) {
        mConnHandler.post(new Runnable() {
            @Override
            public void run() {
                try {
                    mClient.disconnect();
                } catch (Exception ex) {
                    // writeToFile("Exception - stop() ");
                    ex.printStackTrace();
                    mClient = null;
                    mStarted = false;
                } finally {
                    mClient = null;
                    mStarted = false;
                    stopKeepAlives();
                }
            }
        });
    }
}

/**
 * Connects to the broker with the appropriate datastore
 */
private synchronized void connect() {
    String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);
    Log.i(DEBUG_TAG, "Connecting with URL: " + url);
    // writeToFile("Connecting with URL: " + url);
    try {
        if (mDataStore != null) {
            Log.i(DEBUG_TAG, "Connecting with DataStore");
            // writeToFile("Connecting with DataStore");
            mClient = new MqttClient(url, user_ID, mDataStore);
        } else {
            Log.i(DEBUG_TAG, "Connecting with MemStore");
            // writeToFile("Connecting with MemStore");
            mClient = new MqttClient(url, user_ID, mMemStore);
        }
    } catch (Exception e) {
        // writeToFile("Exception - connect L.343");
        e.printStackTrace();
    }

    mConnHandler.post(new Runnable() {
        @Override
        public void run() {
            try {

                mClient.connect(mOpts);

                mClient.subscribe(new String[] { TOPIC_CONNECTED, TOPIC_ASSIGNED, TOPIC_REFRESH }, new int[] { MQTT_QOS_0,
                        MQTT_KEEP_ALIVE_QOS, MQTT_KEEP_ALIVE_QOS });

                mClient.setCallback(new MQTTPushCallback(MQTTService.this, intent, user_ID, TOPIC_CONNECTED, TOPIC_ASSIGNED,
                        TOPIC_REFRESH));

                mStarted = true; // Service is now connected

                Log.i(DEBUG_TAG, "Successfully connected and subscribed starting keep alives");
                // writeToFile("Successfully connected and subscribed starting keep alives");
                startKeepAlives();
            } catch (Exception e) {
                // writeToFile("Exception - connect L.366");
                e.printStackTrace();
            }
        }
    });
}

/**
 * Schedules keep alives via a PendingIntent in the Alarm Manager
 */
private void startKeepAlives() {
    Intent i = new Intent();
    i.setClass(this, MQTTService.class);
    i.setAction(ACTION_KEEPALIVE);
    alarmIntent = PendingIntent.getService(this, 0, i, 0);
    mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + MQTT_KEEP_ALIVE, MQTT_KEEP_ALIVE, alarmIntent);
    Log.i(DEBUG_TAG, "Started keepAlives sucessfully");
    // writeToFile("Started keepAlives sucessfully");
}

/**
 * Cancels the Pending Intent in the alarm manager
 */
private void stopKeepAlives() {
    if (mAlarmManager != null) {
        mAlarmManager.cancel(alarmIntent);
    }
}

/**
 * Publishes a KeepALive to the topic in the broker
 */
private synchronized void keepAlive() {
    // if (isForeground()) {
    if (isConnected()) {
        try {
            sendKeepAlive();
            return;
        } catch (MqttConnectivityException ex) {
            // writeToFile("Exception - KeepAlive() 1");
            ex.printStackTrace();
            reconnectIfNecessary();
        } catch (MqttPersistenceException ex) {
            // writeToFile("Exception - KeepAlive() 2");
            ex.printStackTrace();
            stop();
            restartService();

        } catch (MqttException ex) {
            // writeToFile("Exception - KeepAlive() 3");
            ex.printStackTrace();
            stop();
            restartService();

        } catch (Exception ex) {
            // writeToFile("Exception - KeepAlive() 4");
            ex.printStackTrace();
            stop();
            restartService();
        }
    }
}

/**
 * Checks the current connectivity and reconnects if it is required.
 */
private synchronized void reconnectIfNecessary() {
    if (!mStarted && mClient == null)
        start();
}

/**
 * Query's the NetworkInfo via ConnectivityManager to return the current
 * connected state
 * 
 * @return boolean true if we are connected false otherwise
 */
private boolean isNetworkAvailable() {
    NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();
    return info == null ? false : info.isConnected();
}

/**
 * Verifies the client State with our local connected state
 * 
 * @return true if its a match we are connected false if we aren't connected
 */
private boolean isConnected() {
    if (mStarted && mClient != null && !mClient.isConnected()) {
        Log.i(DEBUG_TAG, "Mismatch between what we think is connected and what is connected");
        // writeToFile("Mismatch between what we think is connected and what is connected");
    }

    if (mClient != null) {
        return mStarted && mClient.isConnected() ? true : false;
    }

    return false;
}

/**
 * Receiver that listens for connectivity changes via ConnectivityManager
 */
private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {
    @Override
    public void onReceive(Context context, Intent intent) {
        // writeToFile("isNetworkAvailable = " + isNetworkAvailable());
        if (isNetworkAvailable() && !mStarted) {
            Log.i(DEBUG_TAG, "Connectivity Changed...");
            // Intent i = new Intent(context, MQTTService.class);
            // i.setAction(ACTION_RECONNECT);
            // context.startService(i);
            restartService();
        } else if (!isNetworkAvailable()) {
            stop();
        }
    }

};

/**
 * Sends a Keep Alive message to the specified topic
 * 
 * @see MQTT_KEEP_ALIVE_MESSAGE
 * @see MQTT_KEEP_ALIVE_TOPIC_FORMAT
 * @return MqttDeliveryToken specified token you can choose to wait for
 *         completion
 */
private synchronized MqttDeliveryToken sendKeepAlive() throws MqttConnectivityException, MqttPersistenceException, MqttException {
    if (!isConnected())
        throw new MqttConnectivityException();

    if (mKeepAliveTopic == null) {
        mKeepAliveTopic = mClient.getTopic(String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORMAT, user_ID));
    }

    Log.i(DEBUG_TAG, "Sending Keepalive to " + MQTT_BROKER);
    // writeToFile("Sending Keepalive to " + MQTT_BROKER);

    MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);
    message.setQos(MQTT_KEEP_ALIVE_QOS);

    return mKeepAliveTopic.publish(message);
}

/**
 * Query's the AlarmManager to check if there is a keep alive currently
 * scheduled
 * 
 * @return true if there is currently one scheduled false otherwise
 */
private synchronized boolean hasScheduledKeepAlives() {
    Intent i = new Intent();
    i.setClass(this, MQTTService.class);
    i.setAction(ACTION_KEEPALIVE);
    PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);

    return pi != null ? true : false;
}

@Override
public IBinder onBind(Intent arg0) {
    return null;
}

/**
 * Connectivity Lost from broker
 */
@Override
public void connectionLost(Throwable arg0) {
    stopKeepAlives();

    mClient = null;

    if (isNetworkAvailable()) {
        reconnectIfNecessary();
    }
}

/**
 * Publish Message Completion
 */
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
    // TODO Auto-generated method stub

}

/**
 * Received Message from broker
 */
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
    // Log.i(DEBUG_TAG,
    // "  Topic:\t" + topic.getName() + "  Message:\t"
    // + new String(message.getPayload()) + "  QoS:\t"
    // + message.getQos());

}

/**
 * MqttConnectivityException Exception class
 */
private class MqttConnectivityException extends Exception {
    private static final long serialVersionUID = -7385866796799469420L;
}

@Override
public void onDestroy() {
    try {
        mClient.unsubscribe(new String[] { TOPIC_CONNECTED, TOPIC_ASSIGNED, TOPIC_REFRESH });
        mClient.disconnect(0);

    } catch (Exception e) {
        // writeToFile("Exception - onDestroy() 1");
        e.printStackTrace();
    } finally {
        new WS_LOGOUT(this).execute(user_ID);
    }
}

public void restartService() {
    mKeepAliveTopic = null;
    actionStart(getApplicationContext()); // restart the service
}

}


person firetrap    schedule 15.04.2014    source источник


Ответы (1)


С какой задержкой вы можете жить, зная, что клиент отключен?

Вы можете использовать функцию Last Will and Testament, чтобы опубликовать значение в теме, когда сервер обнаруживает, что время сохранения активности MQTT истекло, без получения эхо-запроса от клиента.

Вы можете установить время активности во время соединения. Но в зависимости от ваших требований (батарея / использование сети) вам нужно решить, на что его настроить. Если я правильно помню, по умолчанию 30 секунд (может быть 60)

Когда ваш клиент подключается, он может установить флаг для постоянной темы, чтобы сказать, что он в сети, и LWT может установить для него значение 0.

e.g.

при подключении публикации "1" к клиенту / [uniqueid] / online

установите LWT для публикации "0" в client / [uniqueid] / online

person hardillb    schedule 15.04.2014
comment
Я хочу использовать время сохранения активности, если клиент ничего не говорит (не отправляет keepAlive), я предполагаю, что он мертв. - person firetrap; 16.04.2014
comment
Я хочу использовать время сохранения активности, если клиент ничего не говорит (не отправляет keepAlive), я предполагаю, что он мертв. Я использую 30-секундный интервал между хранениями. Я понимаю концепцию, но мне нужно получить информацию о моем сервере, который был подключен за последние 30 секунд, чтобы брокер сохранил эту информацию в каком-то месте, чтобы я мог связаться? заранее спасибо - person firetrap; 16.04.2014
comment
Вам не нужно публиковать 1 в онлайн-теме, вы можете опубликовать метку времени, это позволит вам узнать, когда клиент подключился к сети. Затем используйте LWT, чтобы установить его на 0. - person hardillb; 16.04.2014