Динамически масштабируемый потребитель очереди
Мотивация
Недавно мы создали сервис, который обрабатывает все события между Salesforce и нашим ядром (основной API Uniplaces).
Каждый раз, когда на платформе происходит событие, которое необходимо представить в Salesforce, Core публикует сообщение в очереди FIFO. Затем служба опросит очередь на предмет сообщения, создаст полезную нагрузку с измененными данными и отправит ее в Salesforce.
Мы хотим, чтобы этот процесс был быстрым, чтобы информация в обеих системах была как можно скорее синхронизирована и масштабируема для обеспечения быстрого роста Uniplaces.
Пул потоков (также известный как рабочий пул) позволяет нам легко управлять коллекцией потребительских потоков с настраиваемым размером.
В этой статье я сосредоточусь на пуле потребителя / работника, который мы реализовали с учетом этого.
Архитектура
Основные функции, которые мы хотели получить от пула рабочих, заключались в следующем:
- Динамическая масштабируемость: рабочий может сообщить пулу, действительно ли он обрабатывает сообщение, и если да, то пул немедленно создаст другого рабочего, вместо того, чтобы спать в течение X секунд. Это позволяет пулу быть динамичным в зависимости от количества сообщений в очереди.
- Плавное завершение работы: пул переходит в состояние отключения, при котором новые рабочие не создаются, а работающие в данный момент могут завершить свою работу до ее выхода.
Имея это в виду, нашу архитектуру можно описать следующей схемой:
Несмотря на то, что это описывает именно то, что мы хотим, если бы мы реализовали это, как описано на диаграмме, это привело бы к очень запутанному коду с ifs и повсеместному повторению кода.
Поэтому мы решили, что лучше всего реализовать это как конечный автомат:
Это позволило нам иметь гораздо более чистый код, получая при этом те же результаты.
Хорошо ... хватит симпатичных диаграмм, давайте проверим код!
Реализация
Рабочий
Рабочий состоит из идентификатора и двух каналов для передачи своего статуса пулу.
Потребитель
Каждый рабочий вызовет этот метод Consume, где он опрашивает очередь и обрабатывает сообщение, обновляя пул о его состоянии на этом пути.
Пул рабочих
Штат
Состояние представлено структурой перехода, которая обновляется каждый цикл (внутренний основной цикл пула) и в некоторых случаях может сохранять информацию между состояниями.
Пул
Пул рабочих процессов инициализируется параметрами max worker, interval и timeout, что обеспечивает гибкость в разных средах, и объектом shutdown object, который будет управлять тем, как пул обрабатывает выход.
Метод Start представляет собой основной цикл системы, в котором обрабатывается текущее состояние. Этот метод получает функцию, которая будет вызываться рабочими.
Затем у нас есть функция processLaunchState, которая будет отвечать за создание нового рабочего и порождение новой горутины с функцией actionHandler.
Наконец, функция processWaitState будет прослушивать события от рабочих процессов (каналы завершения и обработки) или от самого приложения (канал завершения работы) или, если что-то пойдет не так, тайм-аут.
Примечание. Некоторые переходы довольно просты, поэтому их методы были опущены для простоты.
Завершение работы
Задача Shutdown - заставить главный поток ждать сигнала Interrupt от системы (Ctrl + C), который ping отправит пул инициировать его завершение, то есть новые рабочие процессы не создаются, а уже работающие успевают завершить работу. Затем он будет ждать второго сигнала Interrupt (который вызывает немедленное завершение работы), сигнала из пула, указывающего, что все рабочие процессы завершены, или тайм-аута, позволяющего завершить основной поток.
Команда
Этот потребитель выполняется из контекста CLI. Следующая команда запустит потребителя в производственной среде с максимум 10 рабочими и 10-секундным интервалом между каждым циклом:
GOENV=prod ./cli -w 10 -i 10
Команда создается следующим образом:
Как вы можете видеть в функции обработчика действий, WorkerPool создается с параметрами из вызова командной строки и обработчика завершения работы. Затем функция Start демонизируется, и основной поток ожидает сигнала выключения.
Будущие улучшения
Это первая итерация этой системы, поэтому есть некоторые аспекты, которые могут быть улучшены в будущем. Например, потоки пула должны быть предварительно инициализированы, ожидая получения работы. Это позволит избежать того, чтобы пул всегда создавал и уничтожал потоки, что является ресурсоемкой операцией. Еще одним улучшением было бы выделить пул отдельному поставщику и сделать его пригодным для использования в другом контексте, отличном от использования очереди.
Если у вас есть какие-либо вопросы или отзывы об этой статье, не стесняйтесь комментировать или писать мне в Твиттере @ https://twitter.com/jmdalmeida1.