Архитектура, управляемая событиями, для автоматизации рабочих процессов
Node.js — это мощная платформа для создания масштабируемых и высокопроизводительных приложений. Одной из ключевых особенностей, которая делает его таким эффективным, является его способность эффективно обрабатывать события. Программирование, управляемое событиями, — это парадигма, которая становится все более популярной в современной разработке программного обеспечения. В этой статье мы рассмотрим, как реализовать управляемый событиями механизм рабочего процесса в Node.js.
Что такое механизм рабочего процесса, управляемый событиями?
Механизм рабочего процесса, управляемый событиями, — это инструмент, позволяющий автоматизировать бизнес-процессы путем управления и выполнения последовательности событий. Другими словами, это система, которая координирует задачи и события в определенном порядке. С помощью механизма рабочего процесса, управляемого событиями, вы можете легко определить этапы процесса и автоматизировать их выполнение.
Реализация механизма рабочего процесса, управляемого событиями, в Node.js
Чтобы реализовать управляемый событиями механизм рабочего процесса в Node.js, вы можете использовать модуль эмиттера событий Node.js. Этот модуль предоставляет способ отправки и прослушивания событий.
Архитектурный дизайн механизма рабочего процесса
Контейнер
Контейнер отслеживает определение потока во время выполнения, другие экземпляры адаптера и обработчики задач. Ключевые функции включают в себя обновление состояния потока и идентификацию следующих задач в потоке с учетом предшествующего идентификатора задачи.
Контроллер
Контроллер расширяет встроенный класс EventEmitter
, который прослушивает различные типы событий (например, TASK_INPUT
, TASK_POP
, TASK_WAIT_INPUT
и т. д.) и обрабатывает их, вызывая другие методы класса для управления выполнением задач в рабочем процессе.
Контроллер также взаимодействует с планировщиком задач, чтобы помещать задачи в очередь и получать статус после выполнения задачи.
Планировщик задач
Планировщик задач расширяет EventEmitter
. Он использует библиотеку sorted-btree для хранения задач в отсортированном порядке на основе их свойств when
и runId
. Свойство «when
» указывает временные метки, когда задача будет выполнена. В классе есть методы для помещения задач в очередь, пометки задач как выполненных и установки галочки для начала обработки задач из очереди.
Функция streamTasks
в планировщике вызывается рекурсивно и проверяет очередь на наличие задач, готовых к обработке, на основе их свойства when
.
Бегун
Runner принимает контейнер, очередь и объект события и возвращает объект с методами, которые можно использовать для запуска задачи. Например, runJSCode используется для выполнения кода JavaScript с использованием модуля isolated-vm, а runPYCode используется для выполнения кода Python с помощью обработчика потоков благодаря Pyodide.
Метод doneTask
принимает результат выполнения задачи и обновляет его в очереди задач для окончательного обновления результата.
Сценарии потока
Обычно язык программирования на основе потоков — это тип визуального языка программирования, который использует узлы для представления блоков кода и стрелки для представления потока данных между ними.
В этом дизайне язык оркестровки также реализован на уровне механизма рабочего процесса для представления визуального языка, который более удобен для разработчиков и более эффективен для выполнения рабочего процесса.
Язык оркестровки предоставляет набор примитивов и конструкций для определения потока бизнес-процесса, включая последовательность задач, зависимости между ними и условия выполнения каждой задачи. Он также может включать функции обработки ошибок, управления транзакциями и интеграции с внешними системами.
Давайте рассмотрим пример, чтобы увидеть, как он выглядит и что он делает. В этом примере у компании есть требование отправить пользователю электронное письмо со скидкой в день его рождения. Вот как работает рабочий процесс:
- Каждый день находите список пользователей, которые отмечают свой день рождения
- Выберите подходящий код скидки и отправьте его каждому найденному пользователю
В соответствии с требованием нам нужно определить две задачи с использованием языка оркестровки следующим образом:
Задача 1: Каждый день находить пользователей, которые отмечают свой день рождения
//Task to find users who has birthday at execution day function findUsers(id){ const run = (runner) => { // mock logic to find users who are on birthday let result = { users: [ { name: 'Kevin' }, { name: 'Lyly' }, ] }; console.log('Found users', result.users); // notify task done runner.doneTask({result}); } const spin = (init) => { // Cron job to run task every day init.cronJob('0 0 * * *'); } return TASK(id, run, spin) }
Задача 2: Выберите код скидки и отправьте его пользователю.
//Task to send discount function sendDiscounts(id){ const run = (runner) => { // user found from previous task let input = runner.getInput(); // mock logic of sending email console.log('Discount sent to', input['find_users'].users); // notify task done runner.doneTask(); } return TASK(id, run) }
Далее мы ставим задачи, определенные в последовательности выполнения.
return FLOW( SEQUENCE ( findUsers("find_users") , sendDiscounts("send_discounts") ) );
Полный сценарий потока SendDiscount.js
выглядит следующим образом:
import {FLOW, TASK, SEQUENCE} from "matiq-runtime"; export default function sendDiscountOnBD(){ //Task to find users who has birthday at execution day function findUsers(id){ const run = (runner) => { // mock logic to find users who are on birthday let result = { users: [ { name: 'Kevin' }, { name: 'Lyly' }, ] }; console.log('Found users', result.users); // notify task done runner.doneTask({result}); } const spin = (init) => { // Cron job to run task every day init.cronJob('0 0 * * *'); } return TASK(id, run, spin) } //Task to send discount function sendDiscounts(id){ const run = (runner) => { // user found from previous task let input = runner.getInput(); // mock logic of sending email console.log('Discount sent to', input['find_users'].users); // notify task done runner.doneTask(); } return TASK(id, run) } return FLOW( SEQUENCE ( findUsers("find_users") , sendDiscounts("send_discounts") ) ); }
Теперь загрузите и выполните поток в index.js
import {Runtime} from "matiq-runtime"; import * as url from 'url'; const script = 'flows/SendDiscount.js'; const flowPath = new URL(script, import.meta.url); const __dirname = url.fileURLToPath(new URL('./', import.meta.url)); const runtime = new Runtime(__dirname); runtime.start(true); runtime.loadCodeFlow(flowPath).then( () => { console.log('Flows loaded'); }); $ node . Found users [ { name: 'Kevin' }, { name: 'Lyly' } ] Discount sent to [ { name: 'Kevin' }, { name: 'Lyly' } ]
Будущая работа
Теперь, когда мы знаем об экспериментальном дизайне механизма автоматизации рабочих процессов в Node.js., предстоит еще многое сделать, чтобы механизм полностью работал в рабочем режиме. Полнофункциональный дизайн, разработанный для системы рабочего процесса, может включать (но не ограничиваться) следующее:
- Простой в использовании визуальный интерфейс для разработки рабочих процессов
- Поддержка сложной логики и манипулирования данными
- Интеграция с распространенными API и сервисами
- Возможности обработки ошибок и отладки
- Расширяемость и гибкость для добавления пользовательских модулей и узлов
Если вы заинтересованы в дальнейшем улучшении и развитии данного дизайна, не стесняйтесь разветвлять его в этом репозитории GitHub.
Пожалуйста, свяжитесь с нами, если у вас есть какие-либо вопросы или вы хотите обменяться инженерными знаниями.