Реализация шаблона параллелизма для обработки задач с помощью Go

Основы пула рабочих

Пул рабочих процессов (также известный как пул потоков) — это шаблон разработки программного обеспечения, в котором создается набор рабочих процессов (пул) для одновременной обработки из очереди задач.

В некоторых языках каждый рабочий поток — это поток (отсюда и название пул потоков), но в Go у нас есть горутины — облегченные потоки, управляемые средой выполнения Go, которые позволяют параллельное программирование.

Существует система очередей для доставки задач работникам и логика для получения задач работниками. Часто система очередей будет каналом, и бездействующие работники будут брать задачи в порядке очереди.

Преимущества

У шаблона пула рабочих есть ряд преимуществ. Во-первых, это простота и опыт разработчика. Поместив параллельный инструмент в пакет за API, разработчикам не нужно думать о рутинной работе по правильному закрытию каналов, предотвращению голодания рабочих процессов, сигнализации, синхронизации и т. д. Они просто знают: «Мне нужно обработать эти задачи, и вот несколько вызовов функций, которые помогут мне в этом!».

Рабочие пулы также могут быть полезны для повышения производительности. Используя определенный набор воркеров (вместо того, чтобы выделять горутины для каждой задачи), мы можем ограничить количество вещей, которые мы разветвляем. Многие системы имеют узкие места на различных уровнях (будь то количество сетевых запросов, запросы к хранилищу в режиме реального времени, скорость обработки в других частях системы и т. д.), а пул рабочих процессов позволяет нам одновременно обрабатывать задания, применяя контролируемое противодействие, чтобы избежать чрезмерное потребление ресурсов в будущем.

Кроме того, для длительных заданий обработки (например, обработки входящих запросов API) пул рабочих процессов позволяет избежать накладных расходов на создание новых рабочих процессов и их удаление для каждого задания. В Go этот процесс немного дешевле, так как горутины очень легкие, но в целом хорошо иметь в виду накладные расходы при использовании ресурсов.

Тюнинг

Размер рабочего пула должен быть настроен для конкретного приложения. Что следует учитывать:

  • Сколько задач приходит?
  • Как быстро могут приходить задачи? Являются ли они взрывными или более устойчивыми?
  • Что дальше от меня? Как быстро он может обрабатывать результаты этих задач (если применимо)?
  • Какую часть своих системных ресурсов я готов потратить на выполнение этих задач? Должна ли обработка расширяться до максимума, который может обработать система, или я должен ограничить это некоторой частью системы?
  • Должна ли численность рабочих быть динамичной? (сегодня мы это делать не будем)

Часто у инженера есть разумная оценка того, сколько рабочих ему нужно (в пределах порядка), и система может быть дополнительно настроена в зависимости от того, как она работает в репрезентативных интеграционных тестах, нагрузочных тестах или в реальном мире. Включив метрики или отслеживание, инженер может действительно отточить, если пул рабочих ресурсов перераспределен или недораспределен. Возможно, мы сможем изучить это более подробно в следующем посте.

Проектирование нашей системы

Сегодня мы реализуем очень простой (и многоразовый!) рабочий пул. Задачи будут определены как интерфейс (пул рабочих процессов не должен заботиться о том, что он обрабатывает, только как это делать), а незанятые рабочие процессы будут ждать на канале дополнительной работы. Нам также понадобится способ создавать, запускать, останавливать и добавлять работу в рабочий пул. Мы также создадим интерфейс для этого.

Ниже вы можете увидеть базовый алгоритм того, как работа отправляется в пул и вытягивается из него.

Мы начнем с определения интерфейса для нашего пула рабочих процессов и задач, которые он может обрабатывать:

На самом деле пулу рабочих процессов нужно только предоставить способ запуска и остановки обработки, а также способ добавления работы в очередь.

Заданиям нужен способ запуска, Execute(), и способ обработки любых ошибок, возникающих в результате задания, OnFailure(). С этим интерфейсом мы оставляем вызывающему коду большую гибкость, чтобы определять, что происходит, не будучи слишком предписывающим.

Хорошо, теперь нам нужно реализовать что-то, что соответствует интерфейсу Pool. Мы сделаем это с помощью SimplePool ниже:

Здесь важно то, что пул рабочих процессов имеет доступ к каналу, из которого можно потреблять работу (tasks), и предоставляет канал (quit) для остановки рабочих процессов при необходимости. Некоторые из секретных деталей включают в себя отслеживание количества воркеров в пуле и использование Once, чтобы гарантировать, что пул может быть запущен или остановлен только один раз.

Мы добавим конструктор, чтобы сообщить вызывающему коду, что мы хотим знать, сколько рабочих процессов находится в пуле и размер буферизованных каналов. Он также установит некоторые внутренние детали реализации:

Обратите внимание, что мы определили пару пользовательских ошибок — ErrNoWorkers и ErrNegativeChannelSize — это техника, упрощающая тесты (с которой вы можете ознакомиться в workerpool/workerpool_test.go в репозитории проекта).

Запуск и остановка пула информирует воркеров о необходимости запуска или остановки обработки работы из канала tasks:

AddWork() предоставляет метод добавления Task в рабочий пул. Для этой реализации мы заблокируем его, когда канал будет заполнен. Таким образом, мы используем select для освобождения любых заблокированных горутин AddWork(), когда рабочий пул остановлен:

Обратите внимание, что мы могли бы выбрать это как неблокирующее с небольшой настройкой — это своего рода выбор дилера, что этот пункт (и даже может быть требованием интерфейса включать оба!):

Теперь реализуем startWorkers, который создаст и запустит numWorkers воркеров для чтения из tasks канала и обработки задачи:

Используя оператор select, незанятый рабочий процесс может ожидать либо новой работы на канале tasks, либо сигнала о том, что пул рабочих процессов выполнен из канала quit.

Если он получит что-то на канале tasks, он выполнит все, что было определено как функция Execute() задачи, а в случае ошибки вызовет функцию OnFailure задачи, чтобы обработать все, что пошло не так.

Как только рабочий закончит обработку задачи, он вернется к ожиданию, пока не появится новая задача или пока пул рабочих процессов не будет остановлен.

Тестирование пакета

Автоматическое тестирование необходимо при разработке кода, особенно для пакетов, которые могут стать зависимостями для других проектов. Они также могут быть очень полезны для процесса разработки!

Они позволяют сделать паузу и рассмотреть интерфейсы и реализации (здесь я кое-что изменил в результате процесса тестирования), и они могут помочь вам найти ошибки реализации или неэффективность интерфейса задолго до того, как они станут проблемой.

С этой целью я добавил workerpool/workerpool_test.go, чтобы выполнить несколько вариантов использования, которые следует отработать.

Начнем с проверки работоспособности нашего конструктора:

Здесь мы в основном проверяем, что конструктор улавливает распространенные случаи неправильного ввода (например, отрицательный размер канала, 0 рабочих процессов и т. д.), и что мы фактически получаем Pool, когда ввод верен.

Далее мы проверим, как наши реализации Start() и Stop() правильно работают при многократном вызове:

Затем мы хотим убедиться, что рабочий пул действительно может обрабатывать работу. Для этого мы создадим тестовую реализацию, соответствующую нашему интерфейсу Task, и запустим некоторые из них через интерфейс workerpool.

Наш testTask реализует Execute() и OnFailure() для соответствия интерфейсу Task, но также добавляет инструментарий для проверки Execute(), возвращающего ошибку, определения, попали ли мы в случай сбоя, а также информирует вызывающий код, когда задача была обработана через WaitGroup.

Как мы видим, иногда имеет смысл создать специальную тестовую инфраструктуру, когда мы пытаемся проверить код. Поскольку рабочий пул работает с интерфейсами (а не с конкретными типами), мы можем просто создать новую вещь, которая реализует этот интерфейс, а не пытаться по-настоящему хакериться с кодом, который в противном случае был бы производственным!

Здесь мы, по сути, удостоверяемся, что можем создать рабочий пул, добавить в него работу и что работа будет обработана, как задумано. workerpool_test.go содержит аналогичную функцию для проверки логики обработки ошибок, но мы пропустим ее здесь для краткости.

Наконец, давайте удостоверимся, что все, что ожидает на AddWork, будет освобождено, если рабочий пул когда-либо будет остановлен (важно правильное управление ресурсами!):

Этот тест создает рабочий пул и набор горутин для добавления в него работы. Задач больше, чем емкость канала, поэтому некоторые горутины будут заблокированы в ожидании завершения задач.

Когда горутины, добавляющие задачи, разблокированы, они сигнализируют об этом вызовом Done() в строке 18. Это означает, что мы можем узнать, когда все горутины разблокированы, ожидая в группе ожидания wg.

Однако ожидание этого waitgroup заблокируется, поэтому нам нужен способ тайм-аута. Мы запустим горутину, чтобы дождаться завершения всех горутин добавления задач, а затем просигнализируем на новом канале, done. По сигналу на этом канале мы можем использовать select для ожидания этого сигнала done. Поскольку в случае сбоя done никогда не произойдет, мы потенциально можем ждать вечно (или, по крайней мере, до истечения времени нашего теста).

Чтобы обойти это, мы можем использовать time.After, который отправляет на канал по истечении указанного времени. Таким образом, мы можем сказать подождать, пока не пройдет секунда или пока не закончатся все горутины. В случае, когда мы достигли тайм-аута, мы потерпели неудачу (поскольку у нас все еще есть горутины, ожидающие AddWork после того, как наш рабочий пул был остановлен). Если мы получим сигнал на done, мы будем знать, что заблокированные горутины завершены, и рабочий пул работает как положено.

Улучшения

Пулы рабочих могут варьироваться от простых (гораздо проще!) до невероятно сложных. Некоторые общие улучшения включают в себя:

  • Добавление метрик, чтобы получить представление о том, сколько времени занимают задания, сколько времени простоя у работников и т. д.
  • Добавьте диспетчер заданий с логикой справедливого распределения, чтобы обеспечить справедливое выполнение различных типов задач для некоторого определения «справедливого».
  • Добавьте больше журналов.
  • Любая дополнительная логика, характерная для вашего приложения.

Кое-что из этого можно увидеть здесь в пуле справедливого распределения рабочих, который я разработал и реализовал для решения наиболее часто встречающейся ошибки Hashicorp Vault sev1!

Подведение итогов

Пулы исполнителей являются важным инструментом в вашем наборе инструментов для параллелизма, и теперь мы немного понимаем, как они работают. Сегодня мы спроектировали и построили достаточно простую (но пригодную для повторного использования!) реализацию и выяснили, как ее протестировать. Мы подумали о том, как его может использовать вызывающий код (например, AddWork() не должен блокироваться после остановки пула), и оставили возможности для его улучшения.

Как всегда, я приветствую любые отзывы и надеюсь, что они были полезны!