Брокер Eclipse Kapua: не авторизован для подписки на тему

Я пытаюсь подписаться на простую тему «foo» от клиента Eclipse Paho MQTT.

Брокер управляется Eclipse Kapua и доступен через tcp: // localhost: 1883 с учетными данными «kapua-broker» и «kapua-password».

Я публикую ценность таким образом:

send(new Payload.Builder().put("testKey","testVal"),"foo");

Это в основном отправляет карту ("testKey", "testVal") с темой "foo". Чтобы подписаться на эту тему, у меня есть следующий код (host = "localhost", port = 1883):

    String topic = "foo";
    String broker = "tcp://"+host+":"+Integer.toString(port);
    String clientId = "supply-chain-control-simulation-listener";
    String username = "kapua-broker";
    String password = "kapua-password";

    try {
        MqttClient client = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(username);
        connOpts.setPassword(password.toCharArray());
        connOpts.setCleanSession(true);
        logger.info("Connecting to broker: "+broker);
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                logger.info("Subscriptions stopped");
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                logger.info(s);
                logger.info(mqttMessage.getPayload().toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        client.connect(connOpts);
        if (client.isConnected())
            logger.info("Connected");
        else
            logger.error(client.getDebug().toString());
        client.subscribe(topic,2);
    } catch(MqttException me) {
        logger.error("reason "+me.getReasonCode());
        logger.error("msg "+me.getMessage());
        logger.error("loc "+me.getLocalizedMessage());
        logger.error("cause "+me.getCause());
        logger.error("excep "+me);
        me.printStackTrace();
    }

Подключение работает, но подписка выдает такую ​​ошибку:

15: 40: 03.240 [ActiveMQ NIO Worker 0] ПРЕДУПРЕЖДЕНИЕ oekbcpKapuaSecurityBrokerFilter - Пользователь 1: kapua-broker (прослушиватель-контроль-имитация-цепочки поставок - tcp: //172.17.0.1: 40888 - идентификатор соединения 1734706196170193882) не авторизован для чтения из: topic: //VirtualTopic.foo


person Arthur Feldman    schedule 13.07.2017    source источник


Ответы (2)


В Kapua вам разрешено публиковать / подписываться в соответствии с вашим разрешением пользователя.

Если у вашего пользователя есть только broker:connect разрешение, вы можете публиковать / подписываться только по теме:

{account-name}/{connectionClientId}/{semanticTopic}

В вашем конкретном случае вы должны публиковать / подписываться по теме:

kapus-sys/supply-chain-control-simulation-listener/foo

kapua-sys - это имя учетной записи, к которой принадлежит пользователь kapua-broker, а supply-chain-control-simulation-listener - это clientId, используемый для создания соединения.

Обратите внимание, что имя пользователя, используемое для подключения, и имя учетной записи - это две разные вещи в Kapua. У учетной записи несколько пользователей.

person Coduz    schedule 25.07.2017

Не subscribe сразу после вызова connect, а вместо этого переместите этот вызов в обратный вызов connectComplete:

IMqttAsyncClient client = new MqttAsyncClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
logger.info("Connecting to broker: "+broker);
client.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String brokerAddress) {
        logger.info("Connected");
        client.subscribe(topic,2);
    }
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("Subscriptions stopped");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        logger.info(s);
        logger.info(mqttMessage.getPayload().toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
});
client.connect(connOpts);

Тем не менее, ваша ошибка, вероятно, исходит от брокера MQTT, который вы используете, и вам необходимо настроить его, чтобы разрешить доступ к этой теме.

person Alexander Farber    schedule 14.07.2017
comment
Спасибо за Ваш ответ. Вы правы, это чище, но, к сожалению, я все равно получаю ту же ошибку. Проблема в том, что я не контролирую брокера, это часть проекта Kapua, и для его использования я просто запускаю контейнер докеров. - person Arthur Feldman; 14.07.2017