Конвейеры данных с одним процессором в конечном итоге натолкнутся на узкое место. Давайте использовать параллельные процессоры с очередями Fan-in/Fan-out!

Я начал читать технические книги после окончания CS, чтобы расширить свои знания по многим темам, таким как системный дизайн, низкоуровневый CS, облачный родной и т. д. В настоящее время я читаю Cloud Native Go, который дает плавный знакомство с облачным миром с использованием преимуществ Go. Книга начинается с основ Go и облачных шаблонов, и именно здесь я подумал: Я должен написать об этих шаблонах!.

В этой серии сообщений в блоге я буду писать о нативных облачных шаблонах, которые я изучил, — также с иллюстрациями, используя Маним.

Проблема

Допустим, у нас есть постоянный источник данных (например, канал Go), и нам нужно обработать данные и поместить обработанные данные в целевой канал с минимально возможной задержкой.

В общем случае у нас есть функция процессора, которая находится посередине между источником и получателями, которая обрабатывает пакеты данных (пакет здесь — абстракция для простоты) один за другим.

Однако каждая функция обработки имеет задержку. Это может быть связано с использованием сети или ЦП, блокировкой системных вызовов и т. д. Если мы отправляем процессору достаточное количество пакетов в секунду, вуаля! У нас сейчас пробка!

Решение

У этой проблемы есть довольно простое решение: использование нескольких процессоров внутри конвейера данных. Таким образом, мы можем обрабатывать поток данных одновременно, что снижает общую задержку и уменьшает перегрузку конвейера.

Вот тут-то и появляются Fan-in и Fan-out…

Мы можем реализовать это решение, используя разделяемую память, такую ​​как очереди сообщений.

При таком подходе мы разделим входящие пакеты данных на разные входные очереди. Затем собственный процессор каждой очереди будет принимать пакеты один за другим, обрабатывать данные и помещать обработанные данные в соответствующую выходную очередь. Наконец, пункт назначения (другой процессор, очередь или какая-либо другая система) будет принимать обработанные пакеты данных из каждой выходной очереди и объединять их в единый источник данных.

Первый подход, разделяющий источник данных (входные данные) на несколько источников данных (входных очередей), называется шаблоном Fan-out. Второй, объединяющий несколько источников данных (выходных очередей) в один источник данных (назначение), называется шаблоном Fan-in.

Для простоты мы указали одну очередь ввода и одну очередь вывода для каждого процессора в нашем конвейере. Мы могли бы использовать несколько очередей ввода-вывода на процессор или общую очередь между несколькими процессорами, в зависимости от общих требований к системе.

Перейти к реализации

Давайте запачкаем руки параллельным Go! Во-первых, давайте определим наши источники данных:

Функция stringGenerator создает строковый канал только для приема, создает горутину, которая помещает строки с префиксом в канал и возвращает канал. Мы читаем из этого канала позже в коде разветвления.

Наша функция процессора тоже будет довольно простой:

Внутри функции процессора мы будем ждать произвольное время, чтобы имитировать задержку процессора.

Реализация разветвления

В реализации разветвления мы возьмем канал только для приема и вернем часть каналов только для приема:

Горутины, созданные внутри функции Splitter, будут обрабатывать логику маршрутизации данных. Обратите внимание, что внутри горутин мы использовали один оператор range для получения из канала source:

for data := range source {
 dest <- data
}

Это означает, что каждая горутина будет пытаться читать из канала в цикле, и первая выполнившая чтение получит следующий элемент. Другими словами, каждая горутина будет бороться за следующий экземпляр данных.

Мы могли бы использовать централизованное решение для разветвления вместо конкурирующих горутин. В этом случае мы бы определили master process для распределения каждого входящего экземпляра данных (в нашем случае string) по всем выходным каналам.

Внедрение вентилятора

В Fan-in мы, по сути, будем делать обратное Fan-out, с некоторыми отличиями:

Функция Aggregator берет часть входных источников, предназначенных только для приема, и возвращает один выходной канал, предназначенный только для приема. Внутри мы создали горутину для каждого источника ввода, которая непрерывно читает из источника и заполняет выходной канал (destination) полученными данными.

Обратите внимание, что мы использовали sync.WaitGroup для ожидания завершения горутин агрегатора. После закрытия источника ввода (channel) цикл for внутри соответствующей горутины завершится и завершит свою работу.

После того, как все входные источники закрыты, мы закончили агрегирование и теперь можем закрыть канал назначения. Это важный шаг, и если мы не закроем созданный нами канал, среда выполнения Go выдаст фатальную ошибку:

fatal error: all goroutines are asleep - deadlock!

основной()

Собрав все наши функции вместе, мы готовы запустить наш код:

TL;DR

  • Конвейеры данных с одним процессором в конечном итоге натолкнутся на узкое место
  • Полезно разделить источник ввода на несколько очередей и одновременно обрабатывать
  • Этот паттерн разделения процесса-агрегации называется разветвлением на входе/разветвлением.

Спасибо за чтение.

Want to Connect?

This story was originally posted at https://www.sazak.io/cloud-native-patterns-illustrated-fan-in-and-fan-out/