Общее сопоставление массива с другим массивом с использованием подпрограмм Go и Go

Каковы преимущества 'Map'?

Представьте себе массив objects с n количеством записей. Вы хотите, чтобы каждая запись в массиве обрабатывалась, но каждый процесс занимает три секунды. Вы обязательно начинаете писать код.

arr := [...]interface{..., ..., ..., ...}
for i, entry := range arr {
    doSomething(i, entry)
}

Сейчас doSomething тяжелоатлет. Если для обработки каждой записи, выполненной в обязательном порядке, требуется около трех секунд, общее время (t), необходимое для обработки n количества массивов, составляет:

t = N * 3s

Имея функцию Map, мы можем попытаться обработать каждую запись отдельно в goroutine.

«ParallelMap» в Go

Представьте себе функцию с именем ParallelMap. Принимает два параметра:

  • Array любого объекта. Назовем это source.
  • Function, который принимает запись source и возвращает новое значение / тип после обработки записи. Назовем это transform.

Он возвращает:

  • Еще один array объектов, являющийся результатом function преобразования записи source.
// ParallelMap an array of something into another thing using go routine
// Example:
//  Map([]int{1,2,3}, func(num int) int { return num+1 })
//  Results: []int{2,3,4}
func ParallelMap(source interface{}, transform interface{}) (interface{}, error) {
    // TODO:
}

1. Убедитесь, что тип источника

sourceV := reflect.ValueOf(source)
kindOf := sourceV.Kind()
if kindOf != reflect.Slice && kindOf != reflect.Array {
    return nil, errors.New("Source value is not an array")
}

2. Убедитесь, что Transform не равно нулю и является функцией

if transform == nil {
    return nil, errors.New("Transform function cannot be nil")
}
tv := reflect.ValueOf(transform)
if tv.Kind() != reflect.Func {
    return nil, errors.New("Transform argument must be a function")
}

3. Подготовьте контейнер результатов.

Здесь мы сталкиваемся с трудностью: мы хотим создать контейнер результатов, но не знаем, какой тип записи в массиве результатов.

Вызывающий знает тип результата, поэтому мы меняем сигнатуру нашей ParallelMap функции на:

func ParallelMap(source interface{}, transform interface{}, T reflect.Type) (interface{}, error) {

Давайте добавим некоторую проверку и приступим к созданию контейнера результатов.

if T == nil {
    return nil, errors.New("Map result type cannot be nil")
}
// kinda equivalent to = make([]T, srcV.Len())
result := reflect.MakeSlice(reflect.SliceOf(T), srcV.Len(), srcV.Cap())

4. Создание петли

// create a waitgroup with length = source array length
// we'll reduce the counter each time an entry finished processing
wg := &sync.WaitGroup{}
wg.Add(srcV.Len())
// for each entry in source array
for i := 0; i < srcV.Len(); i++ {
    // one go routine for each entry
    go func(idx int, entry reflect.Value) {
        //Call the transformation and store the result value
        tfResults := tv.Call([]reflect.Value{entry})
//Store the transformation result into array of result
        resultEntry := result.Index(idx)
        if len(tfResults) > 0 {
            resultEntry.Set(tfResults[0])
        } else {
            resultEntry.Set(reflect.Zero(T))
        }
//this go routine is done
        wg.Done()
    }(i, srcV.Index(i))
}

5. Подождите и вернитесь

wg.Wait()
return result.Interface(), nil

6. Сшить все вместе

// Map Error Collection
var (
    ErrMapSourceNotArray   = errors.New("Input value is not an array")
    ErrMapTransformNil     = errors.New("Transform function cannot be nil")
    ErrMapTransformNotFunc = errors.New("Transform argument must be a function")
    ErrMapResultTypeNil    = errors.New("Map result type cannot be nil")
)
// ParallelMap an array of something into another thing using go routine
// Example:
//  Map([]int{1,2,3}, func(num int) int { return num+1 }, reflect.Type(1))
//  Results: []int{2,3,4}
func ParallelMap(source interface{}, transform interface{}, T reflect.Type) (interface{}, error) {
    srcV := reflect.ValueOf(source)
    kind := srcV.Kind()
    if kind != reflect.Slice && kind != reflect.Array {
        return nil, ErrMapSourceNotArray
    }
if transform == nil {
        return nil, ErrMapTransformNil
    }
tv := reflect.ValueOf(transform)
    if tv.Kind() != reflect.Func {
        return nil, ErrMapTransformNotFunc
    }
if T == nil {
        return nil, ErrMapResultTypeNil
    }
// kinda equivalent to = make([]T, srcv.Len())
    result := reflect.MakeSlice(reflect.SliceOf(T), srcV.Len(), srcV.Cap())
// create a waitgroup with length = source array length
    // we'll reduce the counter each time an entry finished processing
    wg := &sync.WaitGroup{}
    wg.Add(srcV.Len())
    for i := 0; i < srcV.Len(); i++ {
        // one go routine for each entry
        go func(idx int, entry reflect.Value) {
            //Call the transformation and store the result value
            tfResults := tv.Call([]reflect.Value{entry})
//Store the transformation result into array of result
            resultEntry := result.Index(idx)
            if len(tfResults) > 0 {
                resultEntry.Set(tfResults[0])
            } else {
                resultEntry.Set(reflect.Zero(T))
            }
//this go routine is done
            wg.Done()
        }(i, srcV.Index(i))
    }
wg.Wait()
    return result.Interface(), nil
}

Обеспечение поведения «ParallelMap» с помощью модульного теста

func TestParallelMap(t *testing.T) {
    type args struct {
        arr       interface{}
        transform interface{}
        t         reflect.Type
    }
    tests := []struct {
        name    string
        args    args
        want    interface{}
        wantErr bool
    }{
        {
            name:    "Argument is not an array",
            args:    args{arr: 1, transform: nil, t: nil},
            want:    nil,
            wantErr: true,
        },
        {
            name:    "Transform function is nil",
            args:    args{arr: []int{1, 2, 3}, transform: nil, t: nil},
            want:    nil,
            wantErr: true,
        },
        {
            name:    "Transform is not a function",
            args:    args{arr: []int{1, 2, 3}, transform: 1, t: nil},
            want:    nil,
            wantErr: true,
        },
        {
            name:    "T is not supplied",
            args:    args{arr: []int{1, 2, 3}, transform: 1, t: nil},
            want:    nil,
            wantErr: true,
        },
        {
            name: "Valid transform",
            args: args{arr: []int{1, 2, 3}, transform: func(num int) int {
                return num + 1
            }, t: reflect.TypeOf(1)},
            want:    []int{2, 3, 4},
            wantErr: false,
        },
        {
            name: "Valid transform",
            args: args{arr: []int{1, 2, 3}, transform: func(num int) string {
                return strconv.Itoa(num)
            }, t: reflect.TypeOf("")},
            want:    []string{"1", "2", "3"},
            wantErr: false,
        },
    }
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            got, err := ParallelMap(tt.args.arr, tt.args.transform, tt.args.t)
            if (err != nil) != tt.wantErr {
                t.Errorf("Map() error = %v, wantErr %v", err, tt.wantErr)
                return
            }
            if !reflect.DeepEqual(got, tt.want) {
                t.Errorf("Map() = %v, want %v", got, tt.want)
            }
        })
    }
}

И результаты теста показывают:

Running tool: /usr/local/opt/go/libexec/bin/go test -timeout 30s github.com/bastianrob/arrayutil -run ^(TestParallelMap)$
ok      github.com/bastianrob/arrayutil    0.005s
Success: Tests passed.

Как дела?

Что ж, на самом деле код по-прежнему является императивным стилем, заключенным в ParallelMap функцию, использующую goroutine для достижения параллелизма. (Может, лучше назвать его ConcurrentMap?)

Но как это соотносится с воображаемым случаем в начале этой бессмысленной статьи? Давайте протестируем это:

func BenchmarkParallelMap(b *testing.B) {
    source := [100]int{}
    for i := 0; i < len(source); i++ {
        source[i] = i + 1
    }
    transf := func(num int) int {
        //fake, long running operations
        time.Sleep(20 * time.Millisecond)
        return num + 1
    }
b.ResetTimer()
    for n := 0; n < b.N; n++ {
        ParallelMap(source, transf, reflect.TypeOf(1))
    }
}
func BenchmarkImperative(b *testing.B) {
    source := [100]int{}
    for i := 0; i < len(source); i++ {
        source[i] = i + 1
    }
    transf := func(num int) int {
        //fake, long running operations
        time.Sleep(20 * time.Millisecond)
        return num + 1
    }
b.ResetTimer()
    for n := 0; n < b.N; n++ {
        for _, num := range source {
            transf(num)
        }
    }
}

Результат Benchmark показывает:

BenchmarkParallelMap            100      22908438 ns/op       13801 B/op         305 allocs/op
PASS
ok      github.com/bastianrob/arrayutil    2.321s
Success: Benchmarks passed.
BenchmarkImperative               1      2251692199 ns/op      1240 B/op           7 allocs/op
PASS
ok      github.com/bastianrob/arrayutil    2.258s
Success: Benchmarks passed.

Мы можем закончить 100 ParallelMap секунд, в то время как стандартный императивный стиль завершает только одну.

Все это вы можете найти на моем GitHub.