Объединение нескольких результатов из подпрограмм go в один массив

У меня есть следующая функция, которая запускает заданное количество подпрограмм go

func (r *Runner) Execute() {
    var wg sync.WaitGroup
    wg.Add(len(r.pipelines))
    for _, p := range r.pipelines {
        go executePipeline(p, &wg)
    }

    wg.Wait()

    errs := ....//contains list of errors reported by any/all go routines

}

Я думал, что может быть какой-то способ с каналами, но я не могу понять это.


person TheJediCowboy    schedule 17.08.2017    source источник
comment
Создайте срез размером len(r.pipelines) и позвольте каждому воркеру писать в соответствующий ему индекс.   -  person zerkms    schedule 17.08.2017
comment
Существует шаблон для объединения результатов некоторых каналов в один с именем Fan In. Вы можете использовать подход в этом шаблоне (не обязательно в самом шаблоне).   -  person Kaveh Shahbazian    schedule 17.08.2017


Ответы (2)


Один из способов сделать это — использовать мьютексы, если вы можете сделать ошибки возврата executePipeline:

// ...
for _, p := range r.pipelines {
    go func(p pipelineType) {
        if err := executePipeline(p, &wg); err != nil {
            mu.Lock()
            errs = append(errs, err)
            mu.UnLock()
        }
    }(p)
}

Чтобы использовать канал, у вас может быть отдельный список горутин для ошибок:

errCh := make(chan error)

go func() {
    for e := range errCh {
        errs = append(errs, e)
    }
}

и в функции Execute внесите следующие изменения:

// ...
wg.Add(len(r.pipelines))
for _, p := range r.pipelines {
    go func(p pipelineType) {
        if err := executePipeline(p, &wg); err != nil {
            errCh <- err
        }
    }(p)
}

wg.Wait()
close(errCh)

Вы всегда можете использовать метод @zerkms, указанный выше, если количество горутин невелико.

вместо того, чтобы возвращать ошибку из executePipleline и использовать оболочку анонимной функции, вы всегда можете внести вышеуказанные изменения в самой функции.

person abhink    schedule 17.08.2017

Вы можете использовать каналы, как предложил @Kaveh Shahbazian:

func (r *Runner) Execute() {
    pipelineChan := makePipeline(r.pipelines)

    for cnt := 0; cnt < len(r.pipelines); cnt++{
        //recieve from channel
        p := <- pipelineChan
        //do something with the result 
    }
}

func makePipeline(pipelines []pipelineType) <-chan pipelineType{
    pipelineChan := make(chan pipelineType)

    go func(){
        for _, p := range pipelines {
           go func(p pipelineType){
              pipelineChan <- executePipeline(p)
           }(p)
        }
    }()
    return pipelineChan
}

См. этот пример: https://gist.github.com/steven-ferrer/9b2eeac3eed3f7667e8976f399d0b8ad

person Steven Ferrer    schedule 17.08.2017