Параллельная программа состоит из логики, которая выполняется более чем в одном потоке или процессе. Параллелизм — это возможность одновременного выполнения нескольких задач в одной программе.
Когда вы начинаете писать программы, которые обрабатывают параллелизм, вы обнаружите, что у них есть структура, которая возникает из их требований и ограничений. В этой статье мы попытаемся обсудить общую структуру и характеристики параллельной программы с примерами.
Давайте возьмем тривиальный пример программы, которая выводит сообщение определенное количество раз, используя go-процедуру.
package main import ( "log" "time" ) func main() { waitChan := make(chan int, 1) go func() { for i:=1; i<=10; i++ { log.Println("hello world") time.Sleep(time.Duration(1)*time.Second) } waitChan <- 1 }() <-waitChan log.Println("done.") }
После запуска этого…
$ go run helloworld.go 2023/05/04 18:51:00 hello world 2023/05/04 18:51:01 hello world 2023/05/04 18:51:02 hello world 2023/05/04 18:51:03 hello world 2023/05/04 18:51:04 hello world 2023/05/04 18:51:05 hello world 2023/05/04 18:51:06 hello world 2023/05/04 18:51:07 hello world 2023/05/04 18:51:08 hello world 2023/05/04 18:51:09 hello world 2023/05/04 18:51:10 done.
Основной поток должен дождаться завершения выполнения go-процедуры, не выходя досрочно. Мы используем канал waitChan
в качестве примитива синхронизации между основным и рабочим потоком. Основной поток блокируется на канале до тех пор, пока рабочий поток не сигнализирует, что это делается путем записи в канал.
Синхронизация
Синхронизация является наиболее распространенным аспектом реализации параллельных программ. Есть два типа синхронизации, которые мы видим в параллельных программах.
- Синхронизация процессов — когда несколько исполнительных единиц — потоков или процессов — присоединяются к рукопожатию в определенной точке выполнения программы. В приведенном выше примере мы используем канал ожидания для синхронизации процессов между рабочим и основным потоком. На практике такие примитивы, как семафоры, также могут использоваться для синхронизации процессов.
- Синхронизация данных — когда определенные данные изменяются несколькими исполнительными устройствами, и нам необходимо сохранить целостность данных. Примитивы синхронизации, такие как мьютексы и группы ожидания, обычно используются для синхронизации данных. В Golang также можно использовать каналы.
Если параллельные программы неправильно синхронизируют общие объекты данных, это может привести к несогласованным состояниям, таким как состояние гонки.
… Состояние гонки
Давайте рассмотрим пример банковской проблемы. Здесь у нас есть счет, на который можно зачислить зарплату и сберегательные проценты в конце месяца.
// racecondition.go - demonstrating race condition // on a shared variable package main import ( "log" ) var ( waitChan chan int balance float64 = 100000.0 ) func calcInterest(rate float64) { log.Printf("calculting interest on %.2f\n", balance) interest := balance * rate / 100 balance += interest waitChan <- 1 } func depositSalary(salary float64) { log.Printf("depositing salary %.2f", salary) balance += salary waitChan <- 1 } func main() { waitChan = make(chan int, 1) go depositSalary(50000.0) go calcInterest(1.0) // waitChan synchronizes the threads <-waitChan <-waitChan log.Printf("Final balance: %.2f\n", balance) }
В идеале проценты должны начисляться и на заработную плату. Однако, если мы запустим указанную выше программу,
$ go run racecondition.go 2023/05/04 21:43:12 calculting interest on 100000.00 2023/05/04 21:43:12 depositing salary 50000.00 2023/05/04 21:43:12 Final balance: 151000.00
ПРИМЕЧАНИЕ. Если проценты начисляются также и на зарплату, окончательный баланс должен быть 151 500, а не 151 000.
Хотя в приказе о вызове, кажется, указано, что проценты будут рассчитываться после перечисления зарплаты, на практике, похоже, происходит наоборот. Это связано с тем, что в программе есть скрытое состояние гонки.
Это можно увидеть, запустив интерпретатор go с детектором состояния гонки.
$ go run -race racecondition.go 2023/05/04 21:48:14 depositing salary 50000.00 2023/05/04 21:48:14 calculting interest on 100000.00 ================== WARNING: DATA RACE Write at 0x0000005aa518 by goroutine 8: main.depositSalary() /home/anand/learning/go/concurrency/racecondition.go:22 +0x92 main.main.func2() /home/anand/learning/go/concurrency/racecondition.go:29 +0x33 Previous read at 0x0000005aa518 by goroutine 7: main.calcInterest() /home/anand/learning/go/concurrency/racecondition.go:13 +0x39 main.main.func1() /home/anand/learning/go/concurrency/racecondition.go:28 +0x33 Goroutine 8 (running) created at: main.main() /home/anand/learning/go/concurrency/racecondition.go:29 +0x86 Goroutine 7 (running) created at: main.main() /home/anand/learning/go/concurrency/racecondition.go:28 +0x7a ================== 2023/05/04 21:48:14 Final balance: 150000.00 Found 1 data race(s) exit status 66
В этом случае мы неправильно синхронизируем доступ к общей переменной данных, а именно balance
. Давайте исправим это.
Синхронизация данных
Мы можем синхронизировать доступ к общим данным, используя несколько методов. В Go пакет sync предлагает для этого богатый набор объектов. В этом случае мы будем использовать waitGroup
для синхронизации доступа двух рабочих процессов к общей переменной данных.
// datasync.go - Correct data synchronization for the bank // balance problem using a WaitGroup package main import ( "log" "sync" ) var ( waitChan chan int sem sync.WaitGroup balance float64 = 100000.0 ) func calcInterest(rate float64) { // Wait till balance is updated sem.Wait() log.Printf("calculting interest on %.2f\n", balance) interest := balance * rate / 100 balance += interest waitChan <- 1 } func updateBalance(value float64) { balance = value sem.Done() } func depositSalary(salary float64) { log.Printf("depositing salary %.2f", salary) updateBalance(balance + salary) waitChan <- 1 } func main() { waitChan = make(chan int, 1) sem.Add(1) go calcInterest(1.0) go depositSalary(50000.0) <-waitChan <-waitChan log.Printf("Final balance: %.2f\n", balance) }
Мы используем группу ожидания для синхронизации данных. Функция calcInterest
ожидает этого до тех пор, пока она не будет разблокирована завершением depositSalary
, которая вызывает новую функцию updateBalance
, которая сигнализирует группе ожидания, вызывая для нее Done()
.
Запуск этого теперь показывает, что состояние гонки исчезло. Мы получаем правильный ответ независимо от порядка вызова go-процедур.
$ go run -race datasync.go 2023/05/04 22:17:05 depositing salary 50000.00 2023/05/04 22:17:05 calculting interest on 150000.00 2023/05/04 22:17:05 Final balance: 151500.00
Тупики!
Если синхронизация данных реализована неправильно, это может привести к взаимоблокировкам. Взаимоблокировка — это ситуация, когда два или более потока или единицы выполнения ожидают примитива синхронизации для доступа или изменения ресурса, но с круговой зависимостью.
Это приводит к ситуации, когда ни один из потоков не может продолжить свои вычисления, что приводит к взаимоблокировке.
В Go взаимоблокировка возникает, когда go-процедура блокируется для ожидания отправки или получения по каналу. Например,
package main import ( "log" ) func main() { waitChan := make(chan int, 1) go func() { log.Println("hello world") }() <-waitChan log.Println("done.") }
ПРИМЕЧАНИЕ. Это небольшая модификация первой программы.
$ go run helloworld_deadlock.go 2023/05/04 22:27:56 hello world fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /home/anand/learning/go/concurrency/helloworld_deadlock.go:15 +0x45 exit status 2
Основной поток заблокирован на канале waitChan
, но нет ни одного потока, который записывает данные в этот канал, что приводит к тупиковой ситуации.
Другим распространенным примером взаимоблокировки являются циклические зависимости от мьютексов. Например, следующий код,
package main import ( "fmt" "sync" ) var ( lock1 sync.Mutex lock2 sync.Mutex waitChan chan int ) func worker1() { lock1.Lock() for { lock2.Lock() fmt.Println("worker1 acquired lock") lock2.Unlock() } lock1.Unlock() waitChan <- 1 } func worker2() { lock2.Lock() for { lock1.Lock() fmt.Println("worker2 acquired lock") lock1.Unlock() } lock2.Unlock() waitChan <- 1 } func main() { go worker1() go worker2() <-waitChan <-waitChan }
В коде существует циклическая зависимость между lock1
и lock2
, созданная worker1
получением lock1
во внешней области и непрерывным получением и освобождением lock2
во внутреннем цикле, а worker2
отражает это в обратном порядке в своем коде. Через некоторое время это приводит к взаимоблокировке, когда оба потока пытаются получить блокировку друг друга и терпят неудачу.
Очистка темы и выход
Программа может быть нарушена доставкой к ней сигналов. Сигналы в большинстве операционных систем доставляются асинхронно. Если программа неправильно обрабатывает сигналы, она может быть не в состоянии очистить себя и корректно завершить работу.
Давайте рассмотрим простой пример производителя/потребителя, использующего каналы.
// prodconsume.go - A simple producer/consumer concurrent program // illustrating signal handling package main import ( "log" "os" "os/signal" "syscall" "time" "crypto/rand" ) var ( msgChan chan string signals chan os.Signal waitChan chan int flag bool ) const ( poisonPill = "0xbaadf00d" ) // go-routine handling signals func sighandler() { sig := <-signals log.Printf("Signal received - %s\n", sig) // tell producer to exit flag = true } func producer() { data := make([]byte, 16) for { if flag { log.Printf("producer quitting") // tell consumer to exit msgChan <- poisonPill break } _, err := rand.Read(data) if err == nil { msgChan <- string(data) } time.Sleep(time.Duration(1)*time.Second) } // notify main thread waitChan <- 1 } func consumer() { for { var data string data = <- msgChan log.Printf("received - %x\n", data) if data == poisonPill { log.Printf("consumer quitting") break } } // notify main thread waitChan <- 1 } func main() { waitChan = make(chan int, 1) msgChan = make(chan string, 1) signals = make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) go sighandler() go producer() go consumer() for i:=1; i<=2; i++ { <-waitChan } log.Println("main thread - done") }
Здесь мы запускаем поток производителя, который продолжает считывать случайные 16 байтов с псевдослучайного устройства /dev/urandom
и отправлять их в канал. Потребитель продолжает читать из канала и печатает данные.
Мы используем go-процедуру обработчика сигналов, которая перехватывает любые прерывания или сигналы завершения, а затем чисто завершает программу, сначала указав производителю выйти с помощью флага. Затем производитель информирует потребителя о выходе, записывая в канал так называемую «таблетку яда» — специальную строку. И производитель, и потребитель дополнительно сигнализируют основному потоку о выходе, записывая в канал ожидания.
Пример запуска программы…
$ go run prodconsume.go 2023/05/05 13:53:44 received - 43d90daa78c1636942753540fda00522 2023/05/05 13:53:45 received - 02b3469072922ed2dab3ecfbcb3c27be 2023/05/05 13:53:46 received - fd376ddb38f3f086041178f69a875f64 2023/05/05 13:53:47 received - f863acbd51644cdb6df1417917638999
После отправки Ctrl-C
на терминал,
^C2023/05/05 13:53:48 Signal received - interrupt 2023/05/05 13:53:48 producer quitting 2023/05/05 13:53:48 received - 30786261616466303064 2023/05/05 13:53:48 consumer quitting 2023/05/05 13:53:48 main thread - done
Программа завершается чисто в предполагаемом порядке завершения потоков.
Структура параллельной программы
До сих пор мы видели достаточно вещей, которые могут быть правильными и неправильными при написании параллельной программы. Мы также видели кое-что из того, что происходит внутри параллельной программы.
Короче говоря, параллельная программа состоит из следующих разделов логики.
- Инициализация рабочего процесса/потока. Эти рабочие процессы могут выполнять одну и ту же задачу, т. е. вызывать одну и ту же функцию или разные задачи. Однако почти всегда параллельная программа инициализирует как минимум 1 или более таких потоков.
- Синхронизация данных. Большинству нетривиальных параллельных программ потребуется синхронизировать доступ к общим данным, чтобы избежать таких ситуаций, как условия гонки. В большинстве случаев чтение можно считать атомарным, то есть не требующим синхронизации, но запись обычно требует явной синхронизации. Здесь могут использоваться примитивы синхронизации, такие как мьютексы, семафоры или группы ожидания и т. д. В расширенных вариантах использования с типичными шаблонами, такими как Производитель/Потребитель, мы можем использовать примитивы обмена данными, такие как Очереди или в Go — каналы.
- Синхронизация процессов. Потоки или единицы выполнения требуют рукопожатия или синхронизации в какой-то момент своего вычислительного цикла. Как минимум, основной поток должен дождаться, пока другие потоки закончат свои вычисления. В Go для этого часто достаточно канала ожидания. Для синхронизации между несколькими потоками в Go можно использовать группы ожидания. Для более продвинутой синхронизации могут быть реализованы такие примитивы, как Барьеры.
- Сигнализация процесса.Сигнализацию можно рассматривать как расширенную форму синхронизации между потоками. При написании параллельных программ со сложным взаимодействием между рабочими процесс обычно ожидает выполнения определенного действия — обычно выполняемого другим классом или типом рабочего процесса. В таких случаях вторая рабочая группа обычно подает сигнал первой группе рабочих. Обычно используемые сигнальные примитивы включают объекты Semaphores, WaitGroups и Condition.
- Очистка и обработка сигналов. Наконец, потоки/рабочие процессы должны корректно завершаться после завершения своей работы, а также пытаться корректно завершать свои вычисления, если они прерываются сигналами. Следовательно, обработка сигналов важна при написании параллельных программ. При использовании каналов можно использовать такие методы, как Poison Pill, чтобы гарантировать, что работники получают четкий сигнал о выходе и очистке своих задач. Хорошо написанные функции обработчика сигналов могут гарантировать, что потоки завершат свое выполнение в правильном порядке.
В этой статье мы рассмотрели основные аспекты параллельной программы. В будущих статьях этой серии мы возьмем хорошо известные проблемы параллелизма и постепенно расширим каждую из этих тем, а также расширим некоторые разделы, описанные здесь.
Спасибо за прочтение!