Параллельная программа состоит из логики, которая выполняется более чем в одном потоке или процессе. Параллелизм — это возможность одновременного выполнения нескольких задач в одной программе.

Когда вы начинаете писать программы, которые обрабатывают параллелизм, вы обнаружите, что у них есть структура, которая возникает из их требований и ограничений. В этой статье мы попытаемся обсудить общую структуру и характеристики параллельной программы с примерами.

Давайте возьмем тривиальный пример программы, которая выводит сообщение определенное количество раз, используя go-процедуру.

package main

import (
 "log"
 "time"
)

func main() {

 waitChan := make(chan int, 1)
 
 go func() {
  for i:=1; i<=10; i++ {
   log.Println("hello world")
   time.Sleep(time.Duration(1)*time.Second)
  }
  waitChan <- 1
 }()

 <-waitChan

 log.Println("done.")
}

После запуска этого…

$ go run helloworld.go 
2023/05/04 18:51:00 hello world
2023/05/04 18:51:01 hello world
2023/05/04 18:51:02 hello world
2023/05/04 18:51:03 hello world
2023/05/04 18:51:04 hello world
2023/05/04 18:51:05 hello world
2023/05/04 18:51:06 hello world
2023/05/04 18:51:07 hello world
2023/05/04 18:51:08 hello world
2023/05/04 18:51:09 hello world
2023/05/04 18:51:10 done.

Основной поток должен дождаться завершения выполнения go-процедуры, не выходя досрочно. Мы используем канал waitChan в качестве примитива синхронизации между основным и рабочим потоком. Основной поток блокируется на канале до тех пор, пока рабочий поток не сигнализирует, что это делается путем записи в канал.

Синхронизация

Синхронизация является наиболее распространенным аспектом реализации параллельных программ. Есть два типа синхронизации, которые мы видим в параллельных программах.

  1. Синхронизация процессов — когда несколько исполнительных единиц — потоков или процессов — присоединяются к рукопожатию в определенной точке выполнения программы. В приведенном выше примере мы используем канал ожидания для синхронизации процессов между рабочим и основным потоком. На практике такие примитивы, как семафоры, также могут использоваться для синхронизации процессов.
  2. Синхронизация данных — когда определенные данные изменяются несколькими исполнительными устройствами, и нам необходимо сохранить целостность данных. Примитивы синхронизации, такие как мьютексы и группы ожидания, обычно используются для синхронизации данных. В Golang также можно использовать каналы.

Если параллельные программы неправильно синхронизируют общие объекты данных, это может привести к несогласованным состояниям, таким как состояние гонки.

… Состояние гонки

Давайте рассмотрим пример банковской проблемы. Здесь у нас есть счет, на который можно зачислить зарплату и сберегательные проценты в конце месяца.

// racecondition.go - demonstrating race condition
// on a shared variable
package main

import (
 "log"
)

var (
 waitChan chan int
 balance  float64 = 100000.0
)

func calcInterest(rate float64) {
 log.Printf("calculting interest on %.2f\n", balance)
 interest := balance * rate / 100
 balance += interest

 waitChan <- 1
}

func depositSalary(salary float64) {
 log.Printf("depositing salary %.2f", salary)
 balance += salary
 waitChan <- 1
}

func main() {
 waitChan = make(chan int, 1)
 go depositSalary(50000.0)
 go calcInterest(1.0)

// waitChan synchronizes the threads
 <-waitChan
 <-waitChan

 log.Printf("Final balance: %.2f\n", balance)
}

В идеале проценты должны начисляться и на заработную плату. Однако, если мы запустим указанную выше программу,

$ go run racecondition.go 
2023/05/04 21:43:12 calculting interest on 100000.00
2023/05/04 21:43:12 depositing salary 50000.00
2023/05/04 21:43:12 Final balance: 151000.00

ПРИМЕЧАНИЕ. Если проценты начисляются также и на зарплату, окончательный баланс должен быть 151 500, а не 151 000.

Хотя в приказе о вызове, кажется, указано, что проценты будут рассчитываться после перечисления зарплаты, на практике, похоже, происходит наоборот. Это связано с тем, что в программе есть скрытое состояние гонки.

Это можно увидеть, запустив интерпретатор go с детектором состояния гонки.

$ go run -race racecondition.go
2023/05/04 21:48:14 depositing salary 50000.00
2023/05/04 21:48:14 calculting interest on 100000.00
==================
WARNING: DATA RACE
Write at 0x0000005aa518 by goroutine 8:
  main.depositSalary()
      /home/anand/learning/go/concurrency/racecondition.go:22 +0x92
  main.main.func2()
      /home/anand/learning/go/concurrency/racecondition.go:29 +0x33

Previous read at 0x0000005aa518 by goroutine 7:
  main.calcInterest()
      /home/anand/learning/go/concurrency/racecondition.go:13 +0x39
  main.main.func1()
      /home/anand/learning/go/concurrency/racecondition.go:28 +0x33

Goroutine 8 (running) created at:
  main.main()
      /home/anand/learning/go/concurrency/racecondition.go:29 +0x86

Goroutine 7 (running) created at:
  main.main()
      /home/anand/learning/go/concurrency/racecondition.go:28 +0x7a
==================
2023/05/04 21:48:14 Final balance: 150000.00
Found 1 data race(s)
exit status 66

В этом случае мы неправильно синхронизируем доступ к общей переменной данных, а именно balance. Давайте исправим это.

Синхронизация данных

Мы можем синхронизировать доступ к общим данным, используя несколько методов. В Go пакет sync предлагает для этого богатый набор объектов. В этом случае мы будем использовать waitGroup для синхронизации доступа двух рабочих процессов к общей переменной данных.

// datasync.go - Correct data synchronization for the bank
// balance problem using a WaitGroup
package main

import (
 "log"
 "sync"
)

var (
 waitChan chan int
 sem      sync.WaitGroup
 balance  float64 = 100000.0
)

func calcInterest(rate float64) {
 // Wait till balance is updated
 sem.Wait()
 log.Printf("calculting interest on %.2f\n", balance)
 interest := balance * rate / 100
 balance += interest

 waitChan <- 1
}

func updateBalance(value float64) {
 balance = value
 sem.Done()
}

func depositSalary(salary float64) {
 log.Printf("depositing salary %.2f", salary)
 updateBalance(balance + salary)

 waitChan <- 1
}

func main() {
 waitChan = make(chan int, 1)
 sem.Add(1)

 go calcInterest(1.0)
 go depositSalary(50000.0)

 <-waitChan
 <-waitChan

 log.Printf("Final balance: %.2f\n", balance)
}

Мы используем группу ожидания для синхронизации данных. Функция calcInterest ожидает этого до тех пор, пока она не будет разблокирована завершением depositSalary, которая вызывает новую функцию updateBalance, которая сигнализирует группе ожидания, вызывая для нее Done().

Запуск этого теперь показывает, что состояние гонки исчезло. Мы получаем правильный ответ независимо от порядка вызова go-процедур.

$ go run -race datasync.go 
2023/05/04 22:17:05 depositing salary 50000.00
2023/05/04 22:17:05 calculting interest on 150000.00
2023/05/04 22:17:05 Final balance: 151500.00

Тупики!

Если синхронизация данных реализована неправильно, это может привести к взаимоблокировкам. Взаимоблокировка — это ситуация, когда два или более потока или единицы выполнения ожидают примитива синхронизации для доступа или изменения ресурса, но с круговой зависимостью.

Это приводит к ситуации, когда ни один из потоков не может продолжить свои вычисления, что приводит к взаимоблокировке.

В Go взаимоблокировка возникает, когда go-процедура блокируется для ожидания отправки или получения по каналу. Например,

package main

import (
 "log"
)

func main() {

 waitChan := make(chan int, 1)

 go func() {
  log.Println("hello world")
 }()

 <-waitChan

 log.Println("done.")
}

ПРИМЕЧАНИЕ. Это небольшая модификация первой программы.

$ go run helloworld_deadlock.go 
2023/05/04 22:27:56 hello world
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        /home/anand/learning/go/concurrency/helloworld_deadlock.go:15 +0x45
exit status 2

Основной поток заблокирован на канале waitChan, но нет ни одного потока, который записывает данные в этот канал, что приводит к тупиковой ситуации.

Другим распространенным примером взаимоблокировки являются циклические зависимости от мьютексов. Например, следующий код,

package main

import (
 "fmt"
 "sync"
)

var (
 lock1    sync.Mutex
 lock2    sync.Mutex
 waitChan chan int
)

func worker1() {

 lock1.Lock()
 for {
  lock2.Lock()
  fmt.Println("worker1 acquired lock")
  lock2.Unlock()
 }

 lock1.Unlock()

 waitChan <- 1
}

func worker2() {

 lock2.Lock()

 for {
  lock1.Lock()
  fmt.Println("worker2 acquired lock")
  lock1.Unlock()
 }

 lock2.Unlock()

 waitChan <- 1
}

func main() {
 go worker1()
 go worker2()

 <-waitChan
 <-waitChan
}

В коде существует циклическая зависимость между lock1 и lock2, созданная worker1 получением lock1 во внешней области и непрерывным получением и освобождением lock2 во внутреннем цикле, а worker2 отражает это в обратном порядке в своем коде. Через некоторое время это приводит к взаимоблокировке, когда оба потока пытаются получить блокировку друг друга и терпят неудачу.

Очистка темы и выход

Программа может быть нарушена доставкой к ней сигналов. Сигналы в большинстве операционных систем доставляются асинхронно. Если программа неправильно обрабатывает сигналы, она может быть не в состоянии очистить себя и корректно завершить работу.

Давайте рассмотрим простой пример производителя/потребителя, использующего каналы.

// prodconsume.go - A simple producer/consumer concurrent program 
// illustrating signal handling
package main

import (
 "log"
 "os"
 "os/signal"
 "syscall"
 "time"
 "crypto/rand"
)

var (
 msgChan chan string
 signals chan os.Signal
 waitChan chan int
 flag bool
)

const (
 poisonPill = "0xbaadf00d"
)

// go-routine handling signals
func sighandler() {

 sig := <-signals
 log.Printf("Signal received - %s\n", sig)
 // tell producer to exit
 flag = true
}

func producer() {

 data := make([]byte, 16)

 for {
  if flag {
   log.Printf("producer quitting")
   // tell consumer to exit
   msgChan <- poisonPill
   break
  }
  
  _, err := rand.Read(data)
  if err == nil {
   msgChan <- string(data)
  }
  time.Sleep(time.Duration(1)*time.Second)
  
 }

 // notify main thread
 waitChan <- 1
}

func consumer() {

 for {
  var data string
  
  data = <- msgChan
  log.Printf("received - %x\n", data)
  if data == poisonPill {
   log.Printf("consumer quitting")   
   break
  }
 }

 // notify main thread 
 waitChan <- 1 
}

func main() {

 waitChan = make(chan int, 1)
 msgChan = make(chan string, 1)
 signals = make(chan os.Signal, 1)

 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

 go sighandler()
 go producer()
 go consumer()
 
 for i:=1; i<=2; i++ {
  <-waitChan
 }

 log.Println("main thread - done")
}

Здесь мы запускаем поток производителя, который продолжает считывать случайные 16 байтов с псевдослучайного устройства /dev/urandom и отправлять их в канал. Потребитель продолжает читать из канала и печатает данные.

Мы используем go-процедуру обработчика сигналов, которая перехватывает любые прерывания или сигналы завершения, а затем чисто завершает программу, сначала указав производителю выйти с помощью флага. Затем производитель информирует потребителя о выходе, записывая в канал так называемую «таблетку яда» — специальную строку. И производитель, и потребитель дополнительно сигнализируют основному потоку о выходе, записывая в канал ожидания.

Пример запуска программы…

$ go run prodconsume.go
2023/05/05 13:53:44 received - 43d90daa78c1636942753540fda00522
2023/05/05 13:53:45 received - 02b3469072922ed2dab3ecfbcb3c27be
2023/05/05 13:53:46 received - fd376ddb38f3f086041178f69a875f64
2023/05/05 13:53:47 received - f863acbd51644cdb6df1417917638999

После отправки Ctrl-C на терминал,

^C2023/05/05 13:53:48 Signal received - interrupt
2023/05/05 13:53:48 producer quitting
2023/05/05 13:53:48 received - 30786261616466303064
2023/05/05 13:53:48 consumer quitting
2023/05/05 13:53:48 main thread - done

Программа завершается чисто в предполагаемом порядке завершения потоков.

Структура параллельной программы

До сих пор мы видели достаточно вещей, которые могут быть правильными и неправильными при написании параллельной программы. Мы также видели кое-что из того, что происходит внутри параллельной программы.

Короче говоря, параллельная программа состоит из следующих разделов логики.

  1. Инициализация рабочего процесса/потока. Эти рабочие процессы могут выполнять одну и ту же задачу, т. е. вызывать одну и ту же функцию или разные задачи. Однако почти всегда параллельная программа инициализирует как минимум 1 или более таких потоков.
  2. Синхронизация данных. Большинству нетривиальных параллельных программ потребуется синхронизировать доступ к общим данным, чтобы избежать таких ситуаций, как условия гонки. В большинстве случаев чтение можно считать атомарным, то есть не требующим синхронизации, но запись обычно требует явной синхронизации. Здесь могут использоваться примитивы синхронизации, такие как мьютексы, семафоры или группы ожидания и т. д. В расширенных вариантах использования с типичными шаблонами, такими как Производитель/Потребитель, мы можем использовать примитивы обмена данными, такие как Очереди или в Go — каналы.
  3. Синхронизация процессов. Потоки или единицы выполнения требуют рукопожатия или синхронизации в какой-то момент своего вычислительного цикла. Как минимум, основной поток должен дождаться, пока другие потоки закончат свои вычисления. В Go для этого часто достаточно канала ожидания. Для синхронизации между несколькими потоками в Go можно использовать группы ожидания. Для более продвинутой синхронизации могут быть реализованы такие примитивы, как Барьеры.
  4. Сигнализация процесса.Сигнализацию можно рассматривать как расширенную форму синхронизации между потоками. При написании параллельных программ со сложным взаимодействием между рабочими процесс обычно ожидает выполнения определенного действия — обычно выполняемого другим классом или типом рабочего процесса. В таких случаях вторая рабочая группа обычно подает сигнал первой группе рабочих. Обычно используемые сигнальные примитивы включают объекты Semaphores, WaitGroups и Condition.
  5. Очистка и обработка сигналов. Наконец, потоки/рабочие процессы должны корректно завершаться после завершения своей работы, а также пытаться корректно завершать свои вычисления, если они прерываются сигналами. Следовательно, обработка сигналов важна при написании параллельных программ. При использовании каналов можно использовать такие методы, как Poison Pill, чтобы гарантировать, что работники получают четкий сигнал о выходе и очистке своих задач. Хорошо написанные функции обработчика сигналов могут гарантировать, что потоки завершат свое выполнение в правильном порядке.

В этой статье мы рассмотрели основные аспекты параллельной программы. В будущих статьях этой серии мы возьмем хорошо известные проблемы параллелизма и постепенно расширим каждую из этих тем, а также расширим некоторые разделы, описанные здесь.

Спасибо за прочтение!