Было бы пустой тратой сил объяснять эту концепцию в тексте, а не в коде, здесь давайте приведем несколько примеров, чтобы вы сначала получили общее представление о sync.WaitGroup
.
Играйте со следующим кодом на Go Playground: https://go.dev/play/
Пример 1. Дождитесь завершения нескольких горутин:
var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func(num int) { defer wg.Done() fmt.Println("Processing task", num) time.Sleep(1 * time.Second) }(i) } wg.Wait() fmt.Println("All tasks have been processed.") // Output: Processing task 4 Processing task 1 Processing task 0 Processing task 3 Processing task 2 All tasks have been processed.
В этом примере у нас есть цикл, который запускает 5 горутин, каждая из которых имитирует некоторую работу, засыпая на 1 секунду. Мы используем wg.Add(1)
, чтобы добавить новую задачу в группу ожидания, и wg.Done()
, чтобы сигнализировать о завершении задачи. Наконец, мы используем wg.Wait()
для блокировки, пока все задачи не будут выполнены.
Пример 2. Дождитесь завершения одной горутины:
var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() fmt.Println("Processing task") time.Sleep(1 * time.Second) }() wg.Wait() fmt.Println("Task has been processed.") //Output: Processing task Task has been processed.
В этом примере у нас есть только одна горутина, но мы по-прежнему используем WaitGroup
, чтобы дождаться ее завершения, прежде чем продолжить. Обратите внимание, что мы вызываем wg.Add(1)
перед запуском горутины и wg.Done()
после завершения горутины.
Пример 3 — Ожидание двух зависимых горутин:
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup var data []int var mu sync.Mutex // Worker 1: generate data wg.Add(1) go func() { defer wg.Done() for i := 1; i <= 10; i++ { mu.Lock() data = append(data, i) mu.Unlock() } }() // Worker 2: process data wg.Add(1) go func() { defer wg.Done() mu.Lock() for _, d := range data { fmt.Println(d * 2) } mu.Unlock() }() // Wait for workers to finish wg.Wait() } //output: 2 4 6 8 10 12 14 16 18 20
sync.WaitGroup не нужно инициализировать, достаточно просто объявить, так как ее нулевое значение полезно. В sync.WaitGroup есть три метода: Add, который увеличивает счетчик горутин для ожидания; Done, который уменьшает значение счетчика и вызывается горутиной по завершении; и Wait, который приостанавливает свою горутину до тех пор, пока счетчик не достигнет нуля. Add обычно вызывается один раз с указанием количества горутин, которые будут запущены. Done вызывается внутри горутины. Чтобы убедиться, что он вызывается, даже если горутина паникует, мы используем отсрочку.
Глубокое погружение в группы ожидания:
sync.WaitGroup не нужно инициализировать, достаточно просто объявить, так как ее нулевое значение полезно. В sync.WaitGroup есть три метода: Add, который увеличивает счетчик горутин для ожидания; Done, который уменьшает значение счетчика и вызывается горутиной по завершении; и Wait, который приостанавливает свою горутину до тех пор, пока счетчик не достигнет нуля. Add обычно вызывается один раз с указанием количества горутин, которые будут запущены. Done вызывается внутри горутины. Чтобы убедиться, что он вызывается, даже если горутина паникует, мы используем отсрочку.
Мы не передаем явно sync.WaitGroup. Есть две причины. Во-первых, вы должны убедиться, что каждое место, где используется sync.WaitGroup, использует один и тот же экземпляр. Если вы передадите sync.WaitGroup функции goroutine и не используете указатель, то у функции есть копия, и вызов Done не приведет к уменьшению исходной sync.WaitGroup. Используя замыкание для захвата sync.WaitGroup, мы гарантируем, что каждая горутина ссылается на один и тот же экземпляр.
Вторая причина — дизайн. Помните, что вы должны исключить параллелизм из своего API. Обычный шаблон — запуск горутины с замыканием, обертывающим бизнес-логику. Закрытие управляет проблемами, связанными с параллелизмом, а функция предоставляет алгоритм.
Чтобы подчеркнуть:
при создании API в Go обычно рекомендуется не допускать параллелизма в обработчики API и делегировать его другим частям кода, например, через горутины и замыкания.
Вот пример того, как использовать замыкание для управления параллелизмом в простой функции суммирования:
package main import ( "fmt" "sync" ) func main() { // create a slice of integers nums := []int{1, 2, 3, 4, 5} // create a wait group var wg sync.WaitGroup // create a channel to receive the sum result resultChan := make(chan int) // launch a goroutine with a closure to sum the numbers wg.Add(1) go func() { defer wg.Done() // iterate through the slice and sum the numbers sum := 0 for _, num := range nums { sum += num } // send the sum result to the channel resultChan <- sum }() // wait for the goroutine to finish and close the channel go func() { wg.Wait() close(resultChan) }() // receive the sum result from the channel and print it result := <-resultChan fmt.Println(result) } //Output: 15
Используя замыкание для управления параллелизмом, мы можем упростить функцию суммы и делегировать управление параллелизмом замыканию.
Вот еще один пример с обработчиками API:
package main import ( "fmt" "net/http" "strconv" "sync" ) func sum(nums []int) int { total := 0 for _, num := range nums { total += num } return total } func sumHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query()["nums"] if len(queryValues) == 0 { http.Error(w, "Missing 'nums' query parameter", http.StatusBadRequest) return } var nums []int for _, str := range queryValues { num, err := strconv.Atoi(str) if err != nil { http.Error(w, fmt.Sprintf("Invalid number '%s'", str), http.StatusBadRequest) return } nums = append(nums, num) } var wg sync.WaitGroup wg.Add(1) var result int go func() { defer wg.Done() result = sum(nums) }() wg.Wait() fmt.Fprintf(w, "%d", result) } func main() { http.HandleFunc("/sum", sumHandler) http.ListenAndServe(":8080", nil) }
В приведенном выше примере кода sync.WaitGroup
используется, чтобы гарантировать, что функция sum
завершила выполнение перед возвратом результата клиенту.
Приведу еще два примера для быстрого ознакомления:
Пример 1. Простая функция, которая вычисляет сумму среза целых чисел, используя замыкание для управления параллелизмом:
func sum(nums []int) int { var total int c := make(chan int) for _, n := range nums { go func(n int) { c <- n }(n) } for i := 0; i < len(nums); i++ { total += <-c } return total }
Функция использует замыкание для запуска отдельной горутины для каждого целого числа в срезе, которая отправляет целое число в канал. Затем основная горутина читает из канала и накапливает сумму. Это позволяет функции выполнять вычисления одновременно, не раскрывая подробности параллелизма вызывающей стороне API.
Пример 2 — более сложная функция, которая выполняет сетевую операцию и возвращает результат, используя замыкание для управления параллелизмом:
func fetch(url string) ([]byte, error) { var data []byte var err error c := make(chan error) go func() { resp, err := http.Get(url) if err != nil { c <- err return } defer resp.Body.Close() data, err = ioutil.ReadAll(resp.Body) if err != nil { c <- err return } c <- nil }() err = <-c return data, err }
В этом примере функция fetch
принимает URL-адрес в качестве входных данных и возвращает тело ответа в виде фрагмента байта вместе с ошибкой в случае сбоя запроса. Функция использует замыкание для запуска отдельной горутины, которая выполняет сетевую операцию и отправляет результат (или ошибку) в канал. Затем основная горутина читает из канала и возвращает результат или ошибку вызывающей стороне. Опять же, это позволяет функции одновременно выполнять сетевые операции, не раскрывая подробности параллелизма вызывающей стороне API.
Когда у вас есть несколько горутин, записывающих данные в один и тот же канал, вам необходимо убедиться, что канал, в который выполняется запись, закрывается только один раз. Для этого идеально подходит sync.WaitGroup:
func processAndGather(in <-chan int, processor func(int) int, num int) []int { out := make(chan int, num) var wg sync.WaitGroup wg.Add(num) for i:=0; i<num; i++{ go func() { defer wg.Done() for v := range in{ out <- processor(v) } }() } go func() { wg.Wait() close(out) }() var result []int for v := range out{ result = append(result, v) } return result }
Мы запускаем горутину мониторинга, которая ждет, пока не закончатся все обрабатывающие горутины. Когда они это делают, горутина мониторинга вызывает close на выходном канале. Цикл канала for-range завершается, когда out закрывается, а буфер пуст. Наконец, функция возвращает обработанные значения.
Хотя группы ожидания удобны, они не должны быть вашим первым выбором при координации горутин. Используйте их только тогда, когда вам нужно что-то очистить (например, закрыть канал, на который они все пишут) после выхода всех ваших рабочих горутин.