У нас есть несколько библиотек, которые помогают нам с маршрутами и перенаправляют запрос на нужную функцию (также известную как контроллер) на http-серверах, но как насчет MQTT?

В MQTT мы получаем тему с полезной нагрузкой, и мы должны понять, о чем этот запрос, и обработать этот запрос с помощью правильной функции.

server.on('published', (packet, client, cb) => {
  // Whats to do with the package ?
});

Вопрос, на который я попытаюсь ответить, заключается в том, как обрабатывать запросы MQTT с парадигмой функционального программирования.

Процессор

Первый шаг — создать модуль, обрабатывающий запрос.

server.on('published', (packet) => {
  processor(packet);
});

Процессор выяснит, о чем пакет, и перенаправит его нужному обработчику.

// processor.js
export default function processor(packet, client) {
  const resolve = R.cond([
    [COND, EXEC]
  ]);
  resolve(packet);
}

Идея проста, COND — это функция, которая возвращает true или false , если trueбудет выполняться функция EXEC. Подробнее о R.cond в документации Ramda.

resolve — это функция, которая вернет все, что возвращает EXEC.

Так, например..

const paket = {
  topic: 'world'
}
const resolve = R.cond([
  [() => true, (p) => 'hello ' + p.topic]
]);
resolve(packet); // hello world

Как видите, это массив массивов, поэтому у нас может быть несколько условий.

const resolve = R.cond([
  [() => true, (p) => 'hello ' + p.topic],
  [OTHER_FN, OTHER_EXEC],
  [ONE_MORE_FN, ONE_MORE_EXEC],
  // ...
]);

Запрос

Используя этот подход, мы можем очень легко обрабатывать запросы, все, что нам нужно сделать, это создать модули javascript, как показано ниже.

// processRequestExample.js
export function execute(packet) {
  // Process packet and return a valida packet or null
}
export function match(packet) {
  // Return true if execute should be execute
}
export default [match, execute];

И импортируйте этот модуль на processor.js

import processRequestExample from './processRequestExample'
const resolve = R.cond([
  processRequestExample
]);

публикация

И последнее, но не менее важное: если наш resolve возвращает пакет, нам нужно опубликовать ответ. Давайте изменим наш processor.js ..

export default function processor(packet, publish) {
  const resolve = R.cond([
    processRequestExample,
  ]);
  const publishIfConditionalResolved = R.pipe(
    resolve,
    R.when(R.identity, publish),
  );
  publishIfConditionalResolved(packet);
}

R.when разрешает вторую функцию, если первая возвращает значение true.

В этом случае, если resolve возвращает значение, publish будет вызываться с этим значением.

И наш сервер,

server.on('published', (packet, client, cb) => {
  const onPublish = (packetResp) => {
    server.publish(packetResp, client, cb);
  };
  processor(packet, onPublish);
});

Это упрощает обработку запроса MQTT и при необходимости отвечает на этот запрос.

Более реальная ситуация

Давайте создадим калькулятор sum, он сможет суммировать два числа из темы,

  • При получении пакета из темы calc/{num1},{num2}
  • С полезной нагрузкой { "command": "sum" }
  • Должен вернуть {result} с суммой {num1} + {num2}

#1 Добавляем хэндл к нашему процессору

import sumCalc from './sumCalc';
export default function processor(packet, publish) {
  const resolve = R.cond([
    sumCalc
  ]);
  // ...
}

# 2 Реализовать sumCalc

Мы должны выяснить, должен ли наш модуль match разрешать request, и если да, то нам нужно извлечь и суммировать числа из темы.

функция совпадения

Заданием функции match является ответ true, если пакет должен быть разрешен этим модулем, или false, если нет.

Для этого нам нужно проверить тему и полезную нагрузку.

  • Чтобы проверить тему, мы можем использовать простое регулярное выражение
  • Для проверки полезной нагрузки мы можем использовать ajv, который проверяет полезную нагрузку по схеме JSON.

Я прокомментирую код, чтобы было легче понять

// Topic regex validator
const matchChannel = /^calc\/(\d+)\,(\d+)/im;
// JSON Schema validator
const ajv = new Ajv();
const payloadRequestValidator = ajv.compile({
  type: 'object',
  properties: {
    command: {
      type: 'string',
      pattern: '^sum$',
    },
  },
  required: ['command'],
});
export function match() {
  // Validate topic against regex
  const topicLens = R.lensProp('topic');
  const isTopicValid = R.pipe(R.view(topicLens), R.test(matchChannel));
  // Get JSON
  const payloadLeans = R.lensProp('payload');
  const payloadView = R.view(payloadLeans);
  const utf8String = prop => prop.toString('utf-8');
  const getPayload = R.pipe(R.over(payloadLeans, utf8String), payloadView, JSON.parse);
  // Check JSON schema
  const isPayloadValid = R.pipe(R.tryCatch(getPayload, R.F), payloadRequestValidator);
  // If is a valid schema and valid topic, return true
  return R.both(isPayloadValid, isTopicValid);
}

Здесь много команд Ramda, пожалуйста, ознакомьтесь с документацией для лучшего понимания :) ramdajs.com/docs

выполнить функцию

Теперь нам нужно реализовать функцию execute, которая должна

  1. Извлеките n1 и n2 из темы
  2. Сумма чисел
  3. Вернуть пакет с результатом
export function execute() {
  // Used to extract numbers
  const matchNumbers = R.match(matchChannel);
  const topicProp = R.lensProp('topic');
  const viewTopic = R.view(topicProp);
  const transformToBuffer = obj => new Buffer.from(JSON.stringify(obj));
  // Convert property n1 and n2 to int
  const transformations = { n1: parseInt, n2: parseInt };
  const extractAndSum = R.pipe( // #1
    viewTopic,
    matchNumbers,
    R.zipObj(['all', 'n1', 'n2']),
    R.pick(['n1', 'n2']),
    R.evolve(transformations),
    R.values,
    R.sum,
    transformToBuffer
  );
  const payloadProp = R.lensProp('payload');
  const setPayload = R.set(payloadProp);
  return R.chain(setPayload, extractAndSum);
}

Если вы обратите внимание на #1 мы ..

  1. Получить значение темы в viewTopic
  2. Извлечение чисел с помощью регулярного выражения в matchNumbers
  3. Regex возвращает массив типа ['calc/2,3', '2', '5'] , поэтому мы конвертируем в объект {all: 'calc/2,3', n1: '2', n2: '5'} в zipObj
  4. Вернуть объект с n1 и n2 только в pick
  5. Преобразуйте свойства n1 и n2, используя transformations в evolve
  6. Возвращаемый массив только со значениями, {n1: 2, n2: 5} будет [2,5] в values
  7. Суммируйте массив в sum
  8. Преобразовать в буфер результат в transformToBuffer

Экспорт по умолчанию

Вы должны заметить, что match и execute не имеют никаких параметров, причина в том, что нам это не нужно, мы можем экспортировать с помощью R.call

export default [R.call(match), R.call(execute)];

Вы можете найти полный код на моем github.

Не стесняйтесь задавать мне любые вопросы. :)

Нравится? Пожалуйста, нажмите ❤️ ниже, чтобы другие люди могли найти его.