Конвейеры данных с одним процессором в конечном итоге натолкнутся на узкое место. Давайте использовать параллельные процессоры с очередями Fan-in/Fan-out!
Я начал читать технические книги после окончания CS, чтобы расширить свои знания по многим темам, таким как системный дизайн, низкоуровневый CS, облачный родной и т. д. В настоящее время я читаю Cloud Native Go, который дает плавный знакомство с облачным миром с использованием преимуществ Go. Книга начинается с основ Go и облачных шаблонов, и именно здесь я подумал: Я должен написать об этих шаблонах!.
В этой серии сообщений в блоге я буду писать о нативных облачных шаблонах, которые я изучил, — также с иллюстрациями, используя Маним.
Проблема
Допустим, у нас есть постоянный источник данных (например, канал Go), и нам нужно обработать данные и поместить обработанные данные в целевой канал с минимально возможной задержкой.
В общем случае у нас есть функция процессора, которая находится посередине между источником и получателями, которая обрабатывает пакеты данных (пакет здесь — абстракция для простоты) один за другим.
Однако каждая функция обработки имеет задержку. Это может быть связано с использованием сети или ЦП, блокировкой системных вызовов и т. д. Если мы отправляем процессору достаточное количество пакетов в секунду, вуаля! У нас сейчас пробка!
Решение
У этой проблемы есть довольно простое решение: использование нескольких процессоров внутри конвейера данных. Таким образом, мы можем обрабатывать поток данных одновременно, что снижает общую задержку и уменьшает перегрузку конвейера.
Вот тут-то и появляются Fan-in и Fan-out…
Мы можем реализовать это решение, используя разделяемую память, такую как очереди сообщений.
При таком подходе мы разделим входящие пакеты данных на разные входные очереди. Затем собственный процессор каждой очереди будет принимать пакеты один за другим, обрабатывать данные и помещать обработанные данные в соответствующую выходную очередь. Наконец, пункт назначения (другой процессор, очередь или какая-либо другая система) будет принимать обработанные пакеты данных из каждой выходной очереди и объединять их в единый источник данных.
Первый подход, разделяющий источник данных (входные данные) на несколько источников данных (входных очередей), называется шаблоном Fan-out
. Второй, объединяющий несколько источников данных (выходных очередей) в один источник данных (назначение), называется шаблоном Fan-in
.
Для простоты мы указали одну очередь ввода и одну очередь вывода для каждого процессора в нашем конвейере. Мы могли бы использовать несколько очередей ввода-вывода на процессор или общую очередь между несколькими процессорами, в зависимости от общих требований к системе.
Перейти к реализации
Давайте запачкаем руки параллельным Go! Во-первых, давайте определим наши источники данных:
Функция stringGenerator
создает строковый канал только для приема, создает горутину, которая помещает строки с префиксом в канал и возвращает канал. Мы читаем из этого канала позже в коде разветвления.
Наша функция процессора тоже будет довольно простой:
Внутри функции процессора мы будем ждать произвольное время, чтобы имитировать задержку процессора.
Реализация разветвления
В реализации разветвления мы возьмем канал только для приема и вернем часть каналов только для приема:
Горутины, созданные внутри функции Splitter
, будут обрабатывать логику маршрутизации данных. Обратите внимание, что внутри горутин мы использовали один оператор range
для получения из канала source
:
for data := range source { dest <- data }
Это означает, что каждая горутина будет пытаться читать из канала в цикле, и первая выполнившая чтение получит следующий элемент. Другими словами, каждая горутина будет бороться за следующий экземпляр данных.
Мы могли бы использовать централизованное решение для разветвления вместо конкурирующих горутин. В этом случае мы бы определили master process
для распределения каждого входящего экземпляра данных (в нашем случае string
) по всем выходным каналам.
Внедрение вентилятора
В Fan-in мы, по сути, будем делать обратное Fan-out, с некоторыми отличиями:
Функция Aggregator
берет часть входных источников, предназначенных только для приема, и возвращает один выходной канал, предназначенный только для приема. Внутри мы создали горутину для каждого источника ввода, которая непрерывно читает из источника и заполняет выходной канал (destination
) полученными данными.
Обратите внимание, что мы использовали sync.WaitGroup
для ожидания завершения горутин агрегатора. После закрытия источника ввода (channel
) цикл for внутри соответствующей горутины завершится и завершит свою работу.
После того, как все входные источники закрыты, мы закончили агрегирование и теперь можем закрыть канал назначения. Это важный шаг, и если мы не закроем созданный нами канал, среда выполнения Go выдаст фатальную ошибку:
fatal error: all goroutines are asleep - deadlock!
основной()
Собрав все наши функции вместе, мы готовы запустить наш код:
TL;DR
- Конвейеры данных с одним процессором в конечном итоге натолкнутся на узкое место
- Полезно разделить источник ввода на несколько очередей и одновременно обрабатывать
- Этот паттерн разделения процесса-агрегации называется разветвлением на входе/разветвлением.
Спасибо за чтение.
Want to Connect? This story was originally posted at https://www.sazak.io/cloud-native-patterns-illustrated-fan-in-and-fan-out/