diff --git a/concurrency/worker_unbuffed/main.go b/concurrency/worker_unbuffed/main.go index f92f3a0..4c11fdd 100644 --- a/concurrency/worker_unbuffed/main.go +++ b/concurrency/worker_unbuffed/main.go @@ -11,16 +11,19 @@ package main // queue that has no guarantee it will ever be worked on. import ( + "errors" "log" "sync" "time" + + "github.com/davecgh/go-spew/spew" ) // Worker must be implemented by types that want to use // the work pool. // The Worker interface declares a single method called Task type Worker interface { - Task() + Task() error } // Pool provides a pool of goroutines that can execute any Worker @@ -29,14 +32,16 @@ type Worker interface { // pool of goroutines and will have methods that process the work. The type declares two // fields, one named work, which is a channel of the Worker interface type, and a sync.WaitGroup named wg. type Pool struct { - work chan Worker - wg sync.WaitGroup + work chan Worker + wg sync.WaitGroup + errChan chan error } // New creates a new work pool. func New(maxGoroutines int) *Pool { p := Pool{ - work: make(chan Worker), + work: make(chan Worker), + errChan: make(chan error), } p.wg.Add(maxGoroutines) // The for range loop blocks until there’s a Worker interface value to receive on the @@ -46,7 +51,7 @@ func New(maxGoroutines int) *Pool { for i := 0; i < maxGoroutines; i++ { go func() { for w := range p.work { - w.Task() + p.errChan <- w.Task() } p.wg.Done() }() @@ -60,8 +65,12 @@ func New(maxGoroutines int) *Pool { // work channel. Since the work channel is an unbuffered channel, the caller must wait // for a goroutine from the pool to receive it. This is what we want, because the caller // needs the guarantee that the work being submitted is being worked on once the call to Run returns. -func (p *Pool) Run(w Worker) { +func (p *Pool) Run(w Worker) (err error) { p.work <- w + select { + case err = <-p.errChan: + } + return } // Shutdown waits for all the goroutines to shutdown. @@ -71,6 +80,7 @@ func (p *Pool) Run(w Worker) { // they have terminated. func (p *Pool) Shutdown() { close(p.work) + close(p.errChan) p.wg.Wait() } @@ -78,8 +88,10 @@ var names = []string{ "steve", "bob", "mary", - "therese", "jason", + "Bob", + "Lee", + "Jane", } // namePrinter provides special support for printing names. @@ -88,9 +100,13 @@ type namePrinter struct { } // Task implements the Worker interface. -func (m *namePrinter) Task() { +func (m *namePrinter) Task() error { + time.Sleep(time.Second * 1) + if m.name == "jason" { + return errors.New("Invalid name") + } log.Println(m.name) - time.Sleep(time.Second) + return nil } // main is the entry point for all Go programs. @@ -98,22 +114,23 @@ func main() { // Create a work pool with 2 goroutines. p := New(2) var wg sync.WaitGroup - wg.Add(100 * len(names)) - for i := 0; i < 100; i++ { - // Iterate over the slice of names. - for _, name := range names { - // Create a namePrinter and provide the - // specific name. - np := namePrinter{ - name: name, - } - go func() { - // Submit the task to be worked on. When RunTask - // returns we know it is being handled. - p.Run(&np) - wg.Done() - }() + wg.Add(len(names)) + // Iterate over the slice of names. + for _, name := range names { + // Create a namePrinter and provide the + // specific name. + np := namePrinter{ + name: name, } + go func() { + // Submit the task to be worked on. When RunTask + // returns we know it is being handled. + if err := p.Run(&np); err != nil { + spew.Dump(err) + } + wg.Done() + }() + } wg.Wait() // Shutdown the work pool and wait for all existing work