From f50cc7cb23b589ff9757e0ad2a6a4927b3ded3fa Mon Sep 17 00:00:00 2001 From: Jian Han Date: Sat, 6 Jan 2018 16:55:59 +1000 Subject: [PATCH] added timeout to fetchers --- .../divide_and_conquer/divide_and_conquer.go | 10 ++++++- concurrency/subtasks/fetchers/fetchers.go | 28 +++++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go b/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go index a222445..021bd4b 100644 --- a/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go +++ b/concurrency/subtasks/divide_and_conquer/divide_and_conquer.go @@ -30,6 +30,7 @@ func RunDivideAndConquer() { evaluators := []Evaluator{ EvaluatorFunc(func(inV interface{}) (interface{}, error) { + time.Sleep(time.Second * 1) i := inV.(in) r := i.a + i.b return out{"Plus", r}, nil @@ -83,11 +84,18 @@ func DivideAndConquer(data interface{}, evaluators []Evaluator, timeout time.Dur go func() { result, err := e.Evaluate(data) if err != nil { - errors <- err + ech <- err } else { ch <- result } }() + // Remember that an unbuffered channel pauses the writing goroutine until there’s a read by another + // goroutine. If the timeout triggers before the Evaluator finishes executing, + // the read will never happen because the only place those channels are read is in the outer + // goroutine’s select statement, and the outer goroutine exited after the timeout triggered. + // This means that using an unbuffered channel will cause the inner goroutine to wait forever + // whenever there is a timeout, leaking the goroutine. Again, the buffered channel proves + // useful because we know exactly how many writes we can expect. select { case r := <-ch: gather <- r diff --git a/concurrency/subtasks/fetchers/fetchers.go b/concurrency/subtasks/fetchers/fetchers.go index 0e413ba..4572c3a 100644 --- a/concurrency/subtasks/fetchers/fetchers.go +++ b/concurrency/subtasks/fetchers/fetchers.go @@ -34,7 +34,7 @@ type BingFetcher struct { } func (b *BingFetcher) Fetch(url string) (string, error) { - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 10) return fmt.Sprintf("%s is fetching %s", b.Name, url), nil } @@ -51,7 +51,7 @@ type DuckDuckGoFetcher struct { } func (d *DuckDuckGoFetcher) Fetch(url string) (string, error) { - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 10) return fmt.Sprintf("%s is fetching %s", d.Name, url), nil } @@ -67,11 +67,23 @@ func FetchResults(url string, fetchers []Fetcher, timeout time.Duration) ([]stri chErr := make(chan error, len(fetchers)) for _, f := range fetchers { go func(f Fetcher) { - s, err := f.Fetch(url) - if err != nil { + ch := make(chan string, 1) + eCh := make(chan error, 1) + go func() { + s, err := f.Fetch(url) + if err != nil { + eCh <- err + } else { + ch <- s + } + }() + select { + case r := <-ch: + chStr <- r + case err := <-eCh: chErr <- err - } else { - chStr <- s + case <-time.After(timeout): + chErr <- fmt.Errorf("%s timeout after %v on %v", f.GetName(), timeout, url) } }(f) } @@ -89,7 +101,7 @@ func FetchResults(url string, fetchers []Fetcher, timeout time.Duration) ([]stri } func RunFetchers() { - fetchers := []Fetcher{NewGoogleFetcher("Google"), NewGoogleFetcher("Bing"), NewGoogleFetcher("Duck Duck Go")} - r, e := FetchResults("http://www.abc.com", fetchers, time.Millisecond*100) + fetchers := []Fetcher{NewGoogleFetcher("Google"), NewBingFetcher("Bing"), NewDuckDuckGoFetcherFetcher("Duck Duck Go")} + r, e := FetchResults("http://www.abc.com", fetchers, time.Second*2) spew.Dump(r, e) }