Архитектура, управляемая событиями, для автоматизации рабочих процессов

Node.js — это мощная платформа для создания масштабируемых и высокопроизводительных приложений. Одной из ключевых особенностей, которая делает его таким эффективным, является его способность эффективно обрабатывать события. Программирование, управляемое событиями, — это парадигма, которая становится все более популярной в современной разработке программного обеспечения. В этой статье мы рассмотрим, как реализовать управляемый событиями механизм рабочего процесса в Node.js.

Что такое механизм рабочего процесса, управляемый событиями?

Механизм рабочего процесса, управляемый событиями, — это инструмент, позволяющий автоматизировать бизнес-процессы путем управления и выполнения последовательности событий. Другими словами, это система, которая координирует задачи и события в определенном порядке. С помощью механизма рабочего процесса, управляемого событиями, вы можете легко определить этапы процесса и автоматизировать их выполнение.

Реализация механизма рабочего процесса, управляемого событиями, в Node.js

Чтобы реализовать управляемый событиями механизм рабочего процесса в Node.js, вы можете использовать модуль эмиттера событий Node.js. Этот модуль предоставляет способ отправки и прослушивания событий.

Архитектурный дизайн механизма рабочего процесса

Контейнер

Контейнер отслеживает определение потока во время выполнения, другие экземпляры адаптера и обработчики задач. Ключевые функции включают в себя обновление состояния потока и идентификацию следующих задач в потоке с учетом предшествующего идентификатора задачи.

Контроллер

Контроллер расширяет встроенный класс EventEmitter, который прослушивает различные типы событий (например, TASK_INPUT, TASK_POP, TASK_WAIT_INPUT и т. д.) и обрабатывает их, вызывая другие методы класса для управления выполнением задач в рабочем процессе.

Контроллер также взаимодействует с планировщиком задач, чтобы помещать задачи в очередь и получать статус после выполнения задачи.

Планировщик задач

Планировщик задач расширяет EventEmitter. Он использует библиотеку sorted-btree для хранения задач в отсортированном порядке на основе их свойств when и runId. Свойство «when» указывает временные метки, когда задача будет выполнена. В классе есть методы для помещения задач в очередь, пометки задач как выполненных и установки галочки для начала обработки задач из очереди.

Функция streamTasks в планировщике вызывается рекурсивно и проверяет очередь на наличие задач, готовых к обработке, на основе их свойства when.

Бегун

Runner принимает контейнер, очередь и объект события и возвращает объект с методами, которые можно использовать для запуска задачи. Например, runJSCode используется для выполнения кода JavaScript с использованием модуля isolated-vm, а runPYCode используется для выполнения кода Python с помощью обработчика потоков благодаря Pyodide.

Метод doneTask принимает результат выполнения задачи и обновляет его в очереди задач для окончательного обновления результата.

Сценарии потока

Обычно язык программирования на основе потоков — это тип визуального языка программирования, который использует узлы для представления блоков кода и стрелки для представления потока данных между ними.

В этом дизайне язык оркестровки также реализован на уровне механизма рабочего процесса для представления визуального языка, который более удобен для разработчиков и более эффективен для выполнения рабочего процесса.

Язык оркестровки предоставляет набор примитивов и конструкций для определения потока бизнес-процесса, включая последовательность задач, зависимости между ними и условия выполнения каждой задачи. Он также может включать функции обработки ошибок, управления транзакциями и интеграции с внешними системами.

Давайте рассмотрим пример, чтобы увидеть, как он выглядит и что он делает. В этом примере у компании есть требование отправить пользователю электронное письмо со скидкой в ​​день его рождения. Вот как работает рабочий процесс:

  1. Каждый день находите список пользователей, которые отмечают свой день рождения
  2. Выберите подходящий код скидки и отправьте его каждому найденному пользователю

В соответствии с требованием нам нужно определить две задачи с использованием языка оркестровки следующим образом:

Задача 1: Каждый день находить пользователей, которые отмечают свой день рождения

    //Task to find users who has birthday at execution day
    function findUsers(id){
        const run = (runner) => {
            // mock logic to find users who are on birthday
            let result = {
                users: [
                    { name: 'Kevin' },
                    { name: 'Lyly' },
                ]
            };
            console.log('Found users', result.users);

            // notify task done
            runner.doneTask({result});
        }
        const spin = (init) => {
            // Cron job to run task every day
            init.cronJob('0 0 * * *');
        }

        return TASK(id, run, spin)
    }

Задача 2: Выберите код скидки и отправьте его пользователю.

    //Task to send discount
    function sendDiscounts(id){
        const run = (runner) => {
            // user found from previous task
            let input = runner.getInput();
            // mock logic of sending email
            console.log('Discount sent to', input['find_users'].users);
            // notify task done
            runner.doneTask();
        }
        
        return TASK(id, run)
    }

Далее мы ставим задачи, определенные в последовательности выполнения.

    return FLOW(
        SEQUENCE
        (
            findUsers("find_users") ,
            sendDiscounts("send_discounts")
        )
    );

Полный сценарий потока SendDiscount.js выглядит следующим образом:

import {FLOW, TASK, SEQUENCE} from "matiq-runtime";

export default function sendDiscountOnBD(){
    //Task to find users who has birthday at execution day
    function findUsers(id){
        const run = (runner) => {
            // mock logic to find users who are on birthday
            let result = {
                users: [
                    { name: 'Kevin' },
                    { name: 'Lyly' },
                ]
            };
            console.log('Found users', result.users);

            // notify task done
            runner.doneTask({result});
        }
        const spin = (init) => {
            // Cron job to run task every day
            init.cronJob('0 0 * * *');
        }

        return TASK(id, run, spin)
    }
    //Task to send discount
    function sendDiscounts(id){
        const run = (runner) => {
            // user found from previous task
            let input = runner.getInput();
            // mock logic of sending email
            console.log('Discount sent to', input['find_users'].users);
            // notify task done
            runner.doneTask();
        }
        
        return TASK(id, run)
    }

    return FLOW(
        SEQUENCE
        (
            findUsers("find_users") ,
            sendDiscounts("send_discounts")
        )
    );
}

Теперь загрузите и выполните поток в index.js

import {Runtime} from "matiq-runtime";
import * as url from 'url';

const script = 'flows/SendDiscount.js';
const flowPath = new URL(script, import.meta.url);
const __dirname = url.fileURLToPath(new URL('./', import.meta.url));

const runtime = new Runtime(__dirname);
runtime.start(true);

runtime.loadCodeFlow(flowPath).then( () => {
    console.log('Flows loaded');
});
$ node .
Found users [ { name: 'Kevin' }, { name: 'Lyly' } ]
Discount sent to [ { name: 'Kevin' }, { name: 'Lyly' } ]

Будущая работа

Теперь, когда мы знаем об экспериментальном дизайне механизма автоматизации рабочих процессов в Node.js., предстоит еще многое сделать, чтобы механизм полностью работал в рабочем режиме. Полнофункциональный дизайн, разработанный для системы рабочего процесса, может включать (но не ограничиваться) следующее:

  • Простой в использовании визуальный интерфейс для разработки рабочих процессов
  • Поддержка сложной логики и манипулирования данными
  • Интеграция с распространенными API и сервисами
  • Возможности обработки ошибок и отладки
  • Расширяемость и гибкость для добавления пользовательских модулей и узлов

Если вы заинтересованы в дальнейшем улучшении и развитии данного дизайна, не стесняйтесь разветвлять его в этом репозитории GitHub.

Пожалуйста, свяжитесь с нами, если у вас есть какие-либо вопросы или вы хотите обменяться инженерными знаниями.