Как правильно закрыть общие каналы в горутинах при обработке результатов

Я пытаюсь найти правильный метод использования результатов, полученных с помощью подпрограмм worker go, а также изящно выйти из цикла результатов, когда вся работа будет завершена. Для иллюстрации я привел следующий пример. Мой реальный случай немного отличается от этого примера в том смысле, что я не знаю, сколько «работы» вернет каждая подпрограмма worker go, хотя очевидно, что эти циклы for выполняют фиксированное количество результатов (5).

Я новичок в горутинах и каналах, но следующие основные клиенты, как я их понимаю;

  • Только отправители должны закрывать канал
  • Выполнение range над каналом будет продолжаться, пока канал не будет закрыт
package main

import (
    "fmt"
    "sync"
)

func worker1(r chan string, wg *sync.WaitGroup) {
    for i := 0; i < 5; i++ {
        r <- fmt.Sprintf("1.%d", i)
    }

    wg.Done()
}

func worker2(r chan string, wg *sync.WaitGroup) {
    for i := 0; i < 5; i++ {
        r <- fmt.Sprintf("2.%d", i)
    }

    wg.Done()
}

func main() {
    var wg sync.WaitGroup

    r := make(chan string)
    wg.Add(2)
    go worker1(r, &wg)
    go worker2(r, &wg)

    for i := range r {
        fmt.Printf("Got job result: %s\n", i)
    }

    wg.Wait()
}

Этот пример является взаимоблокирующим, потому что цикл диапазона никогда не завершается, так как канал никогда не закрывается. Я могу закрыть канал после завершения работы (т.е. заменить wg.Done() на close(r)), но затем у меня возникает паника, когда другая рабочая горутина пытается отправить дальнейшие результаты в уже закрытый канал.

Наконец, я полагаю, что могу переместить wg.Wait() выше цикла результатов, закрыть канал после его завершения и затем начать печать результатов, но это означает, что я не могу распечатать какие-либо результаты, пока вся работа не будет завершена во всех потоках.

Как правильно выйти из цикла результатов после завершения всех рабочих потоков, не дожидаясь завершения всей работы перед началом печати результатов?


person Peleus    schedule 24.11.2019    source источник
comment
ты go func() { wg.Wait();close(r); }()   -  person mh-cbon    schedule 24.11.2019
comment
Думайте о wg WaitGroup как о счетчике для группы писателей. Вам нужен некоторый набор сущностей (в совокупности, писателей), которые пишут в канал и закрывают его, когда закончили. Чтобы сделать это простым, у вас может быть n (n == 2 на данный момент) писателей, которые действительно пишут, плюс один дополнительный член группы писателей, не пишущих, который просто ждет, пока другие писатели скажут «Готово» и затем делает close. Это анонимная функция в комментарии mh-cbon и ответе Ника Корина.   -  person torek    schedule 25.11.2019


Ответы (1)


Я отредактировал ваш код, чтобы он работал без тупиков. Проблема в том, что прием на канале блокирует основной поток, и ни один из ваших двух goroutines не отправляет больше данных.

Это решение запускает новый goroutine, который закрывает канал результатов после завершения WaitGroup.

package main

import (
    "fmt"
    "sync"
)

func worker1(r chan string, wg *sync.WaitGroup) {
    for i := 0; i < 5; i++ {
        r <- fmt.Sprintf("1.%d", i)
    }

    wg.Done()
}

func worker2(r chan string, wg *sync.WaitGroup) {
    for i := 0; i < 5; i++ {
        r <- fmt.Sprintf("2.%d", i)
    }

    wg.Done()
}

func main() {
    var wg sync.WaitGroup

    r := make(chan string)
    wg.Add(2)
    go worker1(r, &wg)
    go worker2(r, &wg)

    go func() {
        defer close(r)
        wg.Wait()
    }()

    for i := range r {
        fmt.Printf("Got job result: %s\n", i)
    }
}

(Go Playground)

person Nick Corin    schedule 24.11.2019
comment
Спасибо! Извините, я не придумал ничего такого простого. Принято во внимание, теперь все имеет смысл. - person Peleus; 25.11.2019