diff --git a/concurrency/subtasks/main.go b/concurrency/subtasks/main.go index 9ff9289..947a1cd 100644 --- a/concurrency/subtasks/main.go +++ b/concurrency/subtasks/main.go @@ -1,6 +1,13 @@ package main -import "github.com/davecgh/go-spew/spew" +import ( + "fmt" + "reflect" + "runtime" + "time" + + "github.com/davecgh/go-spew/spew" +) // https://medium.com/capital-one-developers/buffered-channels-in-go-what-are-they-good-for-43703871828 @@ -11,36 +18,83 @@ import "github.com/davecgh/go-spew/spew" // ideal way to gather the data back from your subtasks. func main() { - evaluators := []Evaluator{google, yahoo, bing} - data := "Query" - r, e := DivideAndConquer(data, evaluators) - spew.Dump(r, e) + type in struct { + a int + b int + } + + type out struct { + source string + result int + } + + evaluators := []Evaluator{ + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a + i.b + return out{"Plus", r}, nil + }), + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a * i.b + return out{"Multi", r}, nil + }), + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a - i.b + return out{"min", r}, nil + }), + EvaluatorFunc(func(inV interface{}) (interface{}, error) { + i := inV.(in) + r := i.a / i.b + return out{"divider", r}, nil + }), + } + + r, errors := DivideAndConquer(in{2, 3}, evaluators, 10*time.Millisecond) + spew.Dump(r, errors) } -var google = func(data interface{}) (interface{}, error) { - return data, nil +type Evaluator interface { + Evaluate(data interface{}) (interface{}, error) + Name() string +} +type EvaluatorFunc func(interface{}) (interface{}, error) + +func (ef EvaluatorFunc) Evaluate(in interface{}) (interface{}, error) { + return ef(in) } -var yahoo = func(data interface{}) (interface{}, error) { - return data, nil +func (ef EvaluatorFunc) Name() string { + return runtime.FuncForPC(reflect.ValueOf(ef).Pointer()).Name() } -var bing = func(data interface{}) (interface{}, error) { - return data, nil -} - -type Evaluator func(data interface{}) (interface{}, error) - -func DivideAndConquer(data interface{}, evaluators []Evaluator) ([]interface{}, []error) { +func DivideAndConquer(data interface{}, evaluators []Evaluator, timeout time.Duration) ([]interface{}, []error) { gather := make(chan interface{}, len(evaluators)) errors := make(chan error, len(evaluators)) for _, v := range evaluators { go func(e Evaluator) { - result, err := e(data) - if err != nil { + // Why not just use an unbuffered channel? The answer is that we don’t want to leak any goroutines. + // While the Go runtime is capable of handling thousands or hundreds of thousands of goroutines at a time, + // each goroutine does use some resources, so you don’t want to leave them hanging around when + // you don’t have to. If you do, a long-running Go program will start performing poorly + ch := make(chan interface{}, 1) + ech := make(chan error, 1) + go func() { + result, err := e.Evaluate(data) + if err != nil { + errors <- err + } else { + ch <- result + } + }() + select { + case r := <-ch: + gather <- r + case err := <-ech: errors <- err - } else { - gather <- result + case <-time.After(timeout): + errors <- fmt.Errorf("%s timeout after %v on %v", e.Name(), timeout, data) } }(v) } diff --git a/concurrency/subtasks/subtasks b/concurrency/subtasks/subtasks new file mode 100755 index 0000000..55a9b4c Binary files /dev/null and b/concurrency/subtasks/subtasks differ