У нас есть несколько библиотек, которые помогают нам с маршрутами и перенаправляют запрос на нужную функцию (также известную как контроллер) на http-серверах, но как насчет MQTT?
В MQTT мы получаем тему с полезной нагрузкой, и мы должны понять, о чем этот запрос, и обработать этот запрос с помощью правильной функции.
server.on('published', (packet, client, cb) => {
// Whats to do with the package ?
});
Вопрос, на который я попытаюсь ответить, заключается в том, как обрабатывать запросы MQTT с парадигмой функционального программирования.
Процессор
Первый шаг — создать модуль, обрабатывающий запрос.
server.on('published', (packet) => {
processor(packet);
});
Процессор выяснит, о чем пакет, и перенаправит его нужному обработчику.
// processor.js
export default function processor(packet, client) {
const resolve = R.cond([
[COND, EXEC]
]);
resolve(packet);
}
Идея проста, COND — это функция, которая возвращает true
или false
, если true
будет выполняться функция EXEC. Подробнее о R.cond в документации Ramda.
resolve
— это функция, которая вернет все, что возвращает EXEC.
Так, например..
const paket = { topic: 'world' }
const resolve = R.cond([ [() => true, (p) => 'hello ' + p.topic] ]);
resolve(packet); // hello world
Как видите, это массив массивов, поэтому у нас может быть несколько условий.
const resolve = R.cond([
[() => true, (p) => 'hello ' + p.topic],
[OTHER_FN, OTHER_EXEC],
[ONE_MORE_FN, ONE_MORE_EXEC],
// ...
]);
Запрос
Используя этот подход, мы можем очень легко обрабатывать запросы, все, что нам нужно сделать, это создать модули javascript, как показано ниже.
// processRequestExample.js
export function execute(packet) {
// Process packet and return a valida packet or null
}
export function match(packet) {
// Return true if execute should be execute
}
export default [match, execute];
И импортируйте этот модуль на processor.js
import processRequestExample from './processRequestExample'
const resolve = R.cond([
processRequestExample
]);
публикация
И последнее, но не менее важное: если наш resolve
возвращает пакет, нам нужно опубликовать ответ. Давайте изменим наш processor.js
..
export default function processor(packet, publish) { const resolve = R.cond([ processRequestExample, ]);
const publishIfConditionalResolved = R.pipe( resolve, R.when(R.identity, publish), );
publishIfConditionalResolved(packet); }
R.when
разрешает вторую функцию, если первая возвращает значение true.
В этом случае, если
resolve
возвращает значение,publish
будет вызываться с этим значением.
И наш сервер,
server.on('published', (packet, client, cb) => {
const onPublish = (packetResp) => {
server.publish(packetResp, client, cb);
};
processor(packet, onPublish);
});
Это упрощает обработку запроса MQTT и при необходимости отвечает на этот запрос.
Более реальная ситуация
Давайте создадим калькулятор sum
, он сможет суммировать два числа из темы,
- При получении пакета из темы
calc/{num1},{num2}
- С полезной нагрузкой
{ "command": "sum" }
- Должен вернуть
{result}
с суммой{num1} + {num2}
#1 Добавляем хэндл к нашему процессору
import sumCalc from './sumCalc';
export default function processor(packet, publish) { const resolve = R.cond([ sumCalc ]);
// ... }
# 2 Реализовать sumCalc
Мы должны выяснить, должен ли наш модуль match
разрешать request
, и если да, то нам нужно извлечь и суммировать числа из темы.
функция совпадения
Заданием функции match
является ответ true
, если пакет должен быть разрешен этим модулем, или false
, если нет.
Для этого нам нужно проверить тему и полезную нагрузку.
- Чтобы проверить тему, мы можем использовать простое регулярное выражение
- Для проверки полезной нагрузки мы можем использовать ajv, который проверяет полезную нагрузку по схеме JSON.
Я прокомментирую код, чтобы было легче понять
// Topic regex validator const matchChannel = /^calc\/(\d+)\,(\d+)/im;
// JSON Schema validator const ajv = new Ajv(); const payloadRequestValidator = ajv.compile({ type: 'object', properties: { command: { type: 'string', pattern: '^sum$', }, }, required: ['command'], });
export function match() { // Validate topic against regex const topicLens = R.lensProp('topic'); const isTopicValid = R.pipe(R.view(topicLens), R.test(matchChannel));
// Get JSON const payloadLeans = R.lensProp('payload'); const payloadView = R.view(payloadLeans); const utf8String = prop => prop.toString('utf-8'); const getPayload = R.pipe(R.over(payloadLeans, utf8String), payloadView, JSON.parse);
// Check JSON schema const isPayloadValid = R.pipe(R.tryCatch(getPayload, R.F), payloadRequestValidator);
// If is a valid schema and valid topic, return true return R.both(isPayloadValid, isTopicValid); }
Здесь много команд Ramda, пожалуйста, ознакомьтесь с документацией для лучшего понимания :) ramdajs.com/docs
выполнить функцию
Теперь нам нужно реализовать функцию execute, которая должна
- Извлеките
n1
иn2
из темы - Сумма чисел
- Вернуть пакет с результатом
export function execute() { // Used to extract numbers const matchNumbers = R.match(matchChannel); const topicProp = R.lensProp('topic'); const viewTopic = R.view(topicProp); const transformToBuffer = obj => new Buffer.from(JSON.stringify(obj));
// Convert property n1 and n2 to int const transformations = { n1: parseInt, n2: parseInt }; const extractAndSum = R.pipe( // #1 viewTopic, matchNumbers, R.zipObj(['all', 'n1', 'n2']), R.pick(['n1', 'n2']), R.evolve(transformations), R.values, R.sum, transformToBuffer );
const payloadProp = R.lensProp('payload'); const setPayload = R.set(payloadProp); return R.chain(setPayload, extractAndSum); }
Если вы обратите внимание на #1
мы ..
- Получить значение темы в
viewTopic
- Извлечение чисел с помощью регулярного выражения в
matchNumbers
- Regex возвращает массив типа
['calc/2,3', '2', '5']
, поэтому мы конвертируем в объект{all: 'calc/2,3', n1: '2', n2: '5'}
вzipObj
- Вернуть объект с
n1
иn2
только вpick
- Преобразуйте свойства
n1
иn2
, используяtransformations
вevolve
- Возвращаемый массив только со значениями,
{n1: 2, n2: 5}
будет[2,5]
вvalues
- Суммируйте массив в
sum
- Преобразовать в буфер результат в
transformToBuffer
Экспорт по умолчанию
Вы должны заметить, что match
и execute
не имеют никаких параметров, причина в том, что нам это не нужно, мы можем экспортировать с помощью R.call
export default [R.call(match), R.call(execute)];
Вы можете найти полный код на моем github.
Не стесняйтесь задавать мне любые вопросы. :)
Нравится? Пожалуйста, нажмите ❤️ ниже, чтобы другие люди могли найти его.