Лучший способ узнать, как что-то работает, - это собрать самому.

Go - мощный, но простой язык.

Я использую Golang последние четыре года, и мне это нравится. Конечно, это относительно молодой язык с некоторыми шероховатостями и неэффективностью. Но каждый недостаток дополнен уникальной особенностью.

И одна из моих любимых функций Go - это каналы.

Для тех, кто не знаком с Go, каналы - это механизм, позволяющий горутинам (которые похожи на потоки) передавать данные от одного к другому. Это упрощает написание потоковобезопасных параллельных программ и предотвращает состояние гонки.

Для большинства других языков традиционный способ передачи данных из одного потока в другой - через разделяемую память, которая защищена блокировкой.

Один поток записывает данные в память, а другой поток читает или обновляет данные из того же места в памяти. Но потоки непредсказуемы и могут в конечном итоге конфликтовать друг с другом, поэтому необходима блокировка.

Однако правильно использовать блокировку бывает непросто. Это источник целого ряда трудно обнаруживаемых ошибок. Каналы позволяют избежать этих ошибок, абстрагируя эту сложность до простого в использовании интерфейса.

Одна горутина помещает данные в канал: channel <- data

И другая горутина может потреблять данные из этого канала: data := <- channel

Хотя каналы внешне просты, они позволяют создавать мощное программное обеспечение. Однако только недавно я начал задаваться вопросом - как на самом деле работают каналы? Как они реализованы под капотом?

Чтобы ответить на этот вопрос, я подумал, что будет интересно создать каналы с нуля, используя только стандартную библиотеку Go. Что я и сделал.

В этой статье будет рассказано, как я создавал свои собственные импровизированные каналы и как вы тоже можете это сделать. Попутно я расскажу, как на самом деле работают каналы и как они реализуются изнутри.

Итак, приступим к делу.

Интерфейс

Прежде чем мы фактически перейдем к кодированию, давайте сначала определим, что могут делать каналы и какие методы нам нужно будет реализовать. Существует несколько способов взаимодействия с каналами, но для краткости мы сосредоточимся на 4 методах, из которых состоит наш интерфейс.

послать

Отправить передает данные в канал. Если канал заполнен, эта горутина будет заблокирована, пока не останется больше места.

Recv

Recv извлекает данные из переданного канала. Если канал пуст, горутина будет блокироваться до тех пор, пока данные не будут добавлены.

Закрыть

Закрыть фактически закроет канал, поэтому больше в него нельзя будет отправлять данные. Однако вы по-прежнему можете читать любые данные, которые все еще находятся в канале.

Далее

Далее используется для перебора данных в канале до тех пор, пока канал не будет закрыт и пуст.

type Channel interface {
   Send(interface{})
   Recv() (interface{}, bool)
   Close()
   Next() bool
}

Реализация

В этой статье я сосредоточусь на буферизованных каналах, поскольку они создают некоторые интересные проблемы.

Когда вы углубляетесь в его суть, буферизованные каналы - это просто потокобезопасные очереди. Данные передаются по принципу «первым пришел - первым обслужен», и несколько потоков могут как передавать, так и извлекать из канала без повреждения или состояния гонки.

Итак, структура канала, которую мы собираемся создать, будет содержать три основных элемента.

  1. Очередь, которая действует как наш буфер.
  2. Мьютекс / блокировка для обеспечения безопасности потоков.
  3. Логическое значение для отслеживания его «закрытого» статуса.

К счастью, Golang предоставляет и то, и другое прямо из коробки.

type BufferedChannel struct {
  buf    *buffer
  lock   sync.Mutex
  closed bool
}
func NewChannel(size int) *BufferedChannel {
  if size > 0 {
    return &BufferedChannel{
      buf:   newBuffer(size),
    }
  }
  
  // we won't be covering non-buffered channels in this article
  panic("size of channel needs to be greater than 0")
}

Буфер

Буфер - это структура данных, которая выстраивается в очередь, чтобы ее могли использовать другие горутины. По этой причине мы будем использовать container/list в библиотеке Go, которая представляет собой связанный список.

Однако мы не можем использовать библиотеку как есть.

Следует отметить, что у каналов есть строгий лимит емкости. Поэтому, чтобы соответствовать этому требованию, нам нужно добавить некоторую особую логику в наш связанный список, чтобы он не превышал заранее установленный размер.

type buffer struct {
  q *list.List
  maxLen int
}
func newBuffer(size int) *buffer {
  return &buffer{
    q: new(list.List).Init(),
    maxLen: size,
  }
}

В целом логика буфера будет довольно простой.

Пока базовый связанный список не превышает максимальную длину, установленную при инициализации, он будет добавлять данные в очередь. И пока он не пустой, он удалит данные из базовой очереди.

Создав буфер, мы можем перейти к самим методам канала.

Закрытие канала

Первый и самый простой метод, который мы создадим, - это закрытие канала - как бы нелогично это ни звучало.

Закрытие канала не позволяет горутине отправлять в него данные. Но это не мешает потреблять данные из канала. В этом есть смысл, поскольку мы не хотели бы рисковать тем, что важные данные останутся сиротами.

На обычном языке Go вы бы закрыли канал следующим образом: close(channel)

К счастью, логика закрытия довольно проста:

func (c *BufferedChannel) Close() {
  c.lock.Lock()
  defer c.lock.Unlock()
  c.closed = true
}

Отправка данных

Когда мы добавляем данные в буферизованный канал, они помещаются в буфер, чтобы другая горутина могла их снять. И если канал закрыт, среда выполнения Go вызовет панику.

func (c *BufferedChannel) Send(val interface{}) {
  if c.closed {
    panic("channel closed")
  }
  
  c.lock.Lock()
  
  if !c.buf.IsFull() {
    c.buf.Enqueue(val)
  }
  
  c.lock.Unlock()
}

Получение данных

Когда горутина извлекает данные из канала, они удаляются из буфера канала и записываются в память горутины.

Мы можем имитировать это в наших псевдоканалах, удалив данные из начала нашей очереди и вернув их вызывающей стороне метода.

Еще одна вещь, которую следует отметить в Recv, заключается в том, что горутины все еще могут извлекать данные из закрытого канала. И он вернет значение bool, которое указывает, содержит ли канал все еще данные или нет.

func (c *BufferedChannel) Recv() (interface{}, bool) {
  c.lock.Lock()
  if !c.buf.IsEmpty() {
    val := c.buf.Dequeue()
    c.lock.Unlock()
  
    return val, true
  }
 
  c.lock.Unlock()
  return nil, false
}

Итерация по каналу

Одна из очень полезных вещей в каналах - это то, что вы можете перебирать их, пока они не будут закрыты. В языке Go мы можем добиться этого с помощью ключевого слова range.

for data := range channel {
  fmt.Println(data)
}

Итерация по каналу является блокирующей операцией до тех пор, пока в канале не появятся данные или он не будет закрыт.

К сожалению, мы не можем использовать range для нашего канала, но мы можем предоставить метод, который можно использовать в for цикле.

Метод Next:

for channel.Next() {
  data := channel.Recv()
}

Сложная часть этого метода - добиться эффекта блокировки, если канал пуст. Для этого мы можем использовать бесконечный цикл, который будет спать на короткое время, если канал пуст.

func (c *BufferedChannel) Next() bool {
  for {
    c.lock.Lock()
    if c.closed && c.buf.IsEmpty() {
      c.lock.Unlock()
      return false
    }
    if !c.buf.IsEmpty() {
      c.lock.Unlock()
      return true
    }
    c.lock.Unlock()
    time.Sleep(10 * time.Millisecond)
  } 
}

На данный момент мы сделали первый проход по каналу, и он будет работать при определенных условиях. Однако есть два основных сценария, которые мы еще не учли.

Отправка данных по полному каналу

Каждый раз, когда вы отправляете данные на канал, который заполнен, горутина будет блокироваться, пока в буфере не освободится место. Есть простые способы реализовать это.

Мы могли бы заставить горутины засыпать в определенное время и проверять, заполнен ли еще буфер. Единственная проблема с этим подходом - отсутствие гарантии, что данные будут доставлены в том же порядке, в котором они были отправлены. Это функция, которую гарантируют каналы.

Чтобы у наших каналов была такая же гарантия, нам нужно придумать другой подход. В этом случае мы можем позаимствовать из языка Go.

В Go, когда горутина пытается отправить данные в полный канал, они добавляются в очередь в структуре канала. Это приводит к блокировке горутины.

Когда в буфере становится доступным пространство, среда выполнения Go извлекает первую горутину из очереди и добавляет данные в буфер. Таким образом, разблокировка goroutine.

Поскольку мы не можем использовать среду выполнения go для оркестровки всех горутин в нашем канале, мы не можем использовать именно этот подход. Но есть другой способ.

Мы можем использовать систему продажи билетов. Структура канала будет поддерживать очередь и счетчик. Каждая горутина, которая пытается отправить по полному каналу, будет увеличивать счетчик на канале. Этот номер будет номером его билета. Очередь будет содержать номера билетов по порядку.

type BufferedChannel struct {
  buf         *buffer
  lock        sync.Mutex
  closed      bool
  sendCounter int
  sendQ       *list.List
}

Затем каждая горутина будет блокироваться засыпанием с определенным интервалом. Каждый раз, когда он просыпается, он проверяет, есть ли место в буфере.

Как только горутина обнаруживает, что в буфере есть место, она проверяет, находится ли ее номер билета в начале очереди. Если это так, он поместит данные в буфер, удалит себя из очереди и вернется.

Получение данных по пустому каналу

В Go получение данных из пустого канала работает аналогично отправке данных в полный канал, но с небольшим отклонением.

Каждый раз, когда горутина пытается извлечь данные из пустого канала, она блокируется до тех пор, пока в канале не появятся данные. Горутина добавляется в очередь в структуре каналов для получателей. Но когда другая горутина пытается отправить данные в канал с ожидающими получателями, эти данные не добавляются в буфер.

Вместо этого среда выполнения передает данные непосредственно первой ожидающей горутине, полностью обходя буфер. Это крутая оптимизация, которую команда Go сделала при построении каналов.

К сожалению, мы не можем провести такую ​​же оптимизацию с нашим каналом, так как мы не можем использовать среду выполнения Go. Но это не значит, что мы не можем заставить его работать.

Мы можем использовать ту же систему тикетов, которую мы использовали для отправки данных по полному каналу, но использовать другой счетчик и очередь для принимающих горутин. Потоки, ожидающие данных, будут спать в течение определенного интервала и проверять, есть ли данные в очереди. Затем он проверит, будет ли его очередь принимать данные на основе номера билета.

На данный момент мы создали наш канал, и он готов к использованию! Мы можем использовать его точно так же, как обычный канал Go. Чтобы получить полный код, вы можете поиграть с ним на Go Playground или проверить его на GitHub.



Неудивительно, что он не такой производительный или эффективный с точки зрения памяти, как настоящий.

Но это все же то, на что мы можем повесить шляпу. Я надеюсь, что на этом этапе у вас есть более глубокое понимание каналов Go. Это мощный инструмент для создания сложного программного обеспечения.

В качестве забавной задачи разветвите мое репо и попытайтесь улучшить его еще больше и дать ему новые функции, которых нет даже в каналах Go.

В конце концов, лучший способ учиться - это строить.

Дополнительные ресурсы о каналах Go