Отсутствуют распечатки из горутины после закрытия готового канала

Я запускал следующий код на основе примера из книги «Параллелизм в Go», когда заметил, что не все закрывающие распечатки в конвейере печатаются.
Посмотрите, что «выполнено умножение!» отсутствует.
С другой стороны, NumGoroutine () показывает, что выполняется только основная функция.
Что не так со следующим кодом? (https://play.golang.org/p/tkFgvKboVgS)

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    generator := func(done <-chan struct{}) <-chan int {
        intStream := make(chan int)
        i:=0
        go func() {
            defer close(intStream)
            for {
                select {
                case <-done:
                    fmt.Println("done generator!")
                    return
                case intStream <- i:
                    time.Sleep(1 * time.Second)
                    i++
                }
                fmt.Println("generator after select")
            }
        }()
        return intStream
    }

    multiply := func(
        done <-chan struct{},
        intStream <-chan int,
        multiplier int,
    ) <-chan int {
        multipliedStream := make(chan int)
        go func() {
            defer close(multipliedStream)
            for i := range intStream {
                select {
                case <-done:
                    fmt.Println("done multiply !")
                    return
                case multipliedStream <- i * multiplier:
                }
                fmt.Println("multiply after select")
            }
        }()
        return multipliedStream
    }
    add := func(
        done <-chan struct{},
        intStream <-chan int,
        additive int,
    ) <-chan int {
        addedStream := make(chan int)
        go func() {
            defer close(addedStream)
            for i := range intStream {
                select {
                case <-done:
                    fmt.Println("done add !")
                    return
                case addedStream <- i + additive:
                }
                fmt.Println("add after select")
            }
        }()
        return addedStream
    }

    done := make(chan struct{})

    intStream := generator(done)
    pipeline := add(done, multiply(done, intStream, 2), 2)
    go func() {
        time.Sleep(3 * time.Second)
        close(done)
        fmt.Println("Closed done")
    }()
    for v := range pipeline {
        fmt.Println(v)
    }
    fmt.Println("finished iterating pipeline")
    time.Sleep(10 * time.Second)
    fmt.Println("ramaining goroutines:", runtime.NumGoroutine())
    fmt.Println("finished!")
}

Выход:

add after select
2
multiply after select
generator after select
multiply after select
add after select
4
generator after select
multiply after select
add after select
6
generator after select
Closed done
multiply after select
done add !
finished iterating pipeline
generator after select
done generator!
ramaining goroutines: 1
finished!

person Avner Levy    schedule 29.12.2019    source источник


Ответы (2)


Существуют пути кода, по которым не печатаются некоторые сообщения done. Планировщик выбрал тот, который не печатает для multiply. Если вы немного измените код (например, войдите в журнал в разных экземплярах, чем вы делаете сейчас), вы увидите, что он также может пропустить сообщение add done. (https://play.golang.org/p/meEPM5GR9Rr). Причина вот в чем:

Если сообщение done приходит сразу после того, как генератор записывает число в канал, а множитель читает его, то мультипликатор видит, что done доступен, и выбирает его. Это тот случай, когда multiplier печатает сообщение done. Если сообщение done приходит, когда multiplier ожидает в цикле for, тогда multiplier получит закрытие на входном канале (не на канале done), в результате чего цикл for завершится без печати сообщения done.

Проблема возникает из-за того, что вы читаете из канала в цикле for, а затем выбираете. В ожидании чтения цикла for из канала ни одно из событий, связанных с выбором, не оценивается.

Лучший способ справиться с этим - не использовать цикл for для чтения из канала. Например:

for {
     select {
        case <-done:
           return
        case i, ok:= <-intstream:
           if !ok {
              return
           }
           select {
               case <- done:
                    return
               case addedStream <- i + additive:
           }
     }
}
person Burak Serdar    schedule 29.12.2019

Ваши подпрограммы add и multiply - это не вечные циклы, а скорее for ... range циклы. Таким образом, в верхней части каждого цикла они ждут следующего целого числа, а не в select, который либо получает закрытие от done, либо отправляет результат в свой поток. Это не проблема, но это означает, что если их входной поток закрыт, они вернутся, не входя в сам цикл.

Если я добавлю fmt.Println вызовов, чтобы показать точку, в которой они выходят из-за достижения конца их входного потока , поведение немного меняется (вероятно, из-за времени; я особо не удосужился рассуждать об этом, и Бурак Сердар опубликовал свой ответ уже тогда, когда я это набирал), и результат будет следующим:

add after select
2
multiply after select
generator after select
multiply after select
add after select
4
generator after select
multiply after select
add after select
6
generator after select
Closed done
done multiply !
add got end of stream - done!
finished iterating pipeline
generator after select
done generator!
ramaining goroutines: 1
finished!

Часто более разумно сделать так, чтобы только сам генератор принимал done сигнал, а конвейерные функции всегда записывали все свои результаты, что делает их более предсказуемыми. Конечно, тогда тот, кто читает каждую трубу, должен прочитать до конца - но вы делаете это уже в основной горутине, поэтому мы просто распространяем это повсюду. Вот упрощенная версия вашего кода, которая делает это таким образом; он выводит:

2
generator after select
4
generator after select
6
generator after select
Closed done
8
generator after select
done generator!
multiply got end of stream - done!
add got end of stream - done!
finished iterating pipeline
remaining goroutines: 1

Обратите внимание, что на этот раз мы получаем окончательное вычисленное значение (8) из окончательного сгенерированного значения (3).

person torek    schedule 29.12.2019