Динамически масштабируемый потребитель очереди

Мотивация

Недавно мы создали сервис, который обрабатывает все события между Salesforce и нашим ядром (основной API Uniplaces).

Каждый раз, когда на платформе происходит событие, которое необходимо представить в Salesforce, Core публикует сообщение в очереди FIFO. Затем служба опросит очередь на предмет сообщения, создаст полезную нагрузку с измененными данными и отправит ее в Salesforce.

Мы хотим, чтобы этот процесс был быстрым, чтобы информация в обеих системах была как можно скорее синхронизирована и масштабируема для обеспечения быстрого роста Uniplaces.

Пул потоков (также известный как рабочий пул) позволяет нам легко управлять коллекцией потребительских потоков с настраиваемым размером.

В этой статье я сосредоточусь на пуле потребителя / работника, который мы реализовали с учетом этого.

Архитектура

Основные функции, которые мы хотели получить от пула рабочих, заключались в следующем:

  • Динамическая масштабируемость: рабочий может сообщить пулу, действительно ли он обрабатывает сообщение, и если да, то пул немедленно создаст другого рабочего, вместо того, чтобы спать в течение X секунд. Это позволяет пулу быть динамичным в зависимости от количества сообщений в очереди.
  • Плавное завершение работы: пул переходит в состояние отключения, при котором новые рабочие не создаются, а работающие в данный момент могут завершить свою работу до ее выхода.

Имея это в виду, нашу архитектуру можно описать следующей схемой:

Несмотря на то, что это описывает именно то, что мы хотим, если бы мы реализовали это, как описано на диаграмме, это привело бы к очень запутанному коду с ifs и повсеместному повторению кода.

Поэтому мы решили, что лучше всего реализовать это как конечный автомат:

Это позволило нам иметь гораздо более чистый код, получая при этом те же результаты.

Хорошо ... хватит симпатичных диаграмм, давайте проверим код!

Реализация

Рабочий

Рабочий состоит из идентификатора и двух каналов для передачи своего статуса пулу.

Потребитель

Каждый рабочий вызовет этот метод Consume, где он опрашивает очередь и обрабатывает сообщение, обновляя пул о его состоянии на этом пути.

Пул рабочих

Штат

Состояние представлено структурой перехода, которая обновляется каждый цикл (внутренний основной цикл пула) и в некоторых случаях может сохранять информацию между состояниями.

Пул

Пул рабочих процессов инициализируется параметрами max worker, interval и timeout, что обеспечивает гибкость в разных средах, и объектом shutdown object, который будет управлять тем, как пул обрабатывает выход.

Метод Start представляет собой основной цикл системы, в котором обрабатывается текущее состояние. Этот метод получает функцию, которая будет вызываться рабочими.

Затем у нас есть функция processLaunchState, которая будет отвечать за создание нового рабочего и порождение новой горутины с функцией actionHandler.

Наконец, функция processWaitState будет прослушивать события от рабочих процессов (каналы завершения и обработки) или от самого приложения (канал завершения работы) или, если что-то пойдет не так, тайм-аут.

Примечание. Некоторые переходы довольно просты, поэтому их методы были опущены для простоты.

Завершение работы

Задача Shutdown - заставить главный поток ждать сигнала Interrupt от системы (Ctrl + C), который ping отправит пул инициировать его завершение, то есть новые рабочие процессы не создаются, а уже работающие успевают завершить работу. Затем он будет ждать второго сигнала Interrupt (который вызывает немедленное завершение работы), сигнала из пула, указывающего, что все рабочие процессы завершены, или тайм-аута, позволяющего завершить основной поток.

Команда

Этот потребитель выполняется из контекста CLI. Следующая команда запустит потребителя в производственной среде с максимум 10 рабочими и 10-секундным интервалом между каждым циклом:

GOENV=prod ./cli -w 10 -i 10

Команда создается следующим образом:

Как вы можете видеть в функции обработчика действий, WorkerPool создается с параметрами из вызова командной строки и обработчика завершения работы. Затем функция Start демонизируется, и основной поток ожидает сигнала выключения.

Будущие улучшения

Это первая итерация этой системы, поэтому есть некоторые аспекты, которые могут быть улучшены в будущем. Например, потоки пула должны быть предварительно инициализированы, ожидая получения работы. Это позволит избежать того, чтобы пул всегда создавал и уничтожал потоки, что является ресурсоемкой операцией. Еще одним улучшением было бы выделить пул отдельному поставщику и сделать его пригодным для использования в другом контексте, отличном от использования очереди.

Если у вас есть какие-либо вопросы или отзывы об этой статье, не стесняйтесь комментировать или писать мне в Твиттере @ https://twitter.com/jmdalmeida1.