Маршрутизация сообщений в Центре Интернета вещей Azure приводит только к потерянным сообщениям.

Пишу функцию сердцебиения для устройств. Следующий фрагмент кода создает конверт сообщения с помощью пакета Azure SDK для устройств:

private Message constructHeartbeatEnvelope(final HeartbeatEvent heartbeat) throws JsonProcessingException {
    String jsonString = mapper.writeValueAsString(heartbeat);
    System.out.println(jsonString);
    final Message message = new Message(jsonString.getBytes(StandardCharsets.UTF_8));
    message.setProperty("type", "heartbeat");
    message.setContentTypeFinal("application/json");
    message.setExpiryTime(HEARTBEAT_INTERVAL);
    return message;
}

В Центре Интернета вещей, которому принадлежит это устройство, у меня есть два следующих маршрута для тестирования:

+-----------------+----------------+--------------------------------------+------------+---------+
|      Name       |  Data Source   |            Routing Query             |  Endpoint  | Enabled |
+-----------------+----------------+--------------------------------------+------------+---------+
| heartbeat-route | DeviceMessages | $body.type.name = 'heartbeat/device' | events-dev | true    |
| events-dev-type | DeviceMessages | $type = 'heartbeat'                  | events-dev | true    |
+-----------------+----------------+--------------------------------------+------------+---------+

Текст сообщения выглядит так:

{
    "created": 1568104629007,
    "type": {
        "name": "heartbeat/device",
        "version": "1.0"
    },
    "origin": "iothubdeviceid",
    "content": {
        // heartbeat metadata
    },
    "originId": "S1"
}

Резервный маршрут отключен.

Конечная точка events-dev - это концентратор событий, у которого есть выделенная группа потребителей для этих контрольных событий. Последним в цепочке является приложение-функция, которое потребляет от этого концентратора событий и печатает только тело того, что оно получает, для целей отладки.

Однако, когда я просматриваю метрики в своем Центре Интернета вещей, он говорит, что ноль сообщений направляется в концентратор событий, а все отправленные сообщения «осиротели».

В разделе «Встроенные конечные точки» я также добавил дополнительную группу потребителей под названием heartbeats.

Я попытался активировать резервный маршрут, а затем использовать модифицированный образец из Python SDK, который принимает только сообщения, я подключился к концентратору событий IoT Hub и могу найти там сообщения, но мне вообще не удается заставить маршрутизацию работать. Я также вижу, что потерянные сообщения возвращаются к 0 в метриках, а количество сообщений, перенаправленных на резерв, увеличивается. Я также не получаю сообщений в heartbeats группе потребителей при таком подходе, только на $Default.

Что я здесь делаю не так?

Решение:

Судя по ответу ниже, помогло следующее:

Удалите $ для фильтрации по свойствам, добавленным пользователем:

+-----------------+----------------+--------------------------------------+------------+---------+
|      Name       |  Data Source   |            Routing Query             |  Endpoint  | Enabled |
+-----------------+----------------+--------------------------------------+------------+---------+
| heartbeat-route | DeviceMessages | $body.type.name = 'heartbeat/device' | events-dev | true    |
| events-dev-type | DeviceMessages | type = 'heartbeat'                   | events-dev | true    |
+-----------------+----------------+--------------------------------------+------------+---------+

Установите кодировку содержимого вручную в конверте сообщения:

private Message constructHeartbeatEnvelope(final HeartbeatEvent heartbeat) throws JsonProcessingException {
    String jsonString = mapper.writeValueAsString(heartbeat);
    System.out.println(jsonString);
    final Message message = new Message(jsonString.getBytes(StandardCharsets.UTF_8));
    message.setProperty("type", "heartbeat");
    message.setContentTypeFinal("application/json");
    message.setContentEncoding("utf-8"); // <---- This line
    message.setExpiryTime(HEARTBEAT_INTERVAL);
    return message;
}

Маршрутизация тела, похоже, не работает, несмотря на указанные выше исправления


person jokarl    schedule 10.09.2019    source источник


Ответы (1)


Следующее исправление:

  1. тип = 'сердцебиение'
  2. установите сообщение ContentEcoding для utf-8

Подробнее см. здесь.

person Roman Kiss    schedule 10.09.2019
comment
Кодировка содержимого установлена ​​на utf8 под капотом при использовании конструктора строк класса Message, но я также пробовал массив байтов и сам создавал массив байтов utf8, не повезло. Я попробую синтаксис свойства, спасибо - person jokarl; 10.09.2019
comment
Один из вышеперечисленных сработал, я еще не проверил, какой именно. Спасибо, это вызвало головную боль! - person jokarl; 10.09.2019
comment
Я пробовал маршрутизацию на type, и это работает. Однако телесная маршрутизация - нет. С type = 'heartbeat' сообщения доходят до моей функции, с $body.type.name = 'heartbeat/device' сообщения - нет. Хотя с фильтрацией свойств все в порядке, еще раз спасибо! - person jokarl; 10.09.2019
comment
Пробовали ли вы маршрутизацию тела с реальными данными или только с вашим образцом, приведенным выше. Образец не будет перенаправлен, потому что метаданные «комментарий // сердцебиение» не могут быть проанализированы. - person René; 12.09.2019