added timeout to fetchers

This commit is contained in:
Jian Han 2018-01-06 16:55:59 +10:00
parent 81fe56d468
commit f50cc7cb23
2 changed files with 29 additions and 9 deletions

View File

@ -30,6 +30,7 @@ func RunDivideAndConquer() {
evaluators := []Evaluator{ evaluators := []Evaluator{
EvaluatorFunc(func(inV interface{}) (interface{}, error) { EvaluatorFunc(func(inV interface{}) (interface{}, error) {
time.Sleep(time.Second * 1)
i := inV.(in) i := inV.(in)
r := i.a + i.b r := i.a + i.b
return out{"Plus", r}, nil return out{"Plus", r}, nil
@ -83,11 +84,18 @@ func DivideAndConquer(data interface{}, evaluators []Evaluator, timeout time.Dur
go func() { go func() {
result, err := e.Evaluate(data) result, err := e.Evaluate(data)
if err != nil { if err != nil {
errors <- err ech <- err
} else { } else {
ch <- result ch <- result
} }
}() }()
// Remember that an unbuffered channel pauses the writing goroutine until theres a read by another
// goroutine. If the timeout triggers before the Evaluator finishes executing,
// the read will never happen because the only place those channels are read is in the outer
// goroutines select statement, and the outer goroutine exited after the timeout triggered.
// This means that using an unbuffered channel will cause the inner goroutine to wait forever
// whenever there is a timeout, leaking the goroutine. Again, the buffered channel proves
// useful because we know exactly how many writes we can expect.
select { select {
case r := <-ch: case r := <-ch:
gather <- r gather <- r

View File

@ -34,7 +34,7 @@ type BingFetcher struct {
} }
func (b *BingFetcher) Fetch(url string) (string, error) { func (b *BingFetcher) Fetch(url string) (string, error) {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 10)
return fmt.Sprintf("%s is fetching %s", b.Name, url), nil return fmt.Sprintf("%s is fetching %s", b.Name, url), nil
} }
@ -51,7 +51,7 @@ type DuckDuckGoFetcher struct {
} }
func (d *DuckDuckGoFetcher) Fetch(url string) (string, error) { func (d *DuckDuckGoFetcher) Fetch(url string) (string, error) {
time.Sleep(time.Second * 2) time.Sleep(time.Second * 10)
return fmt.Sprintf("%s is fetching %s", d.Name, url), nil return fmt.Sprintf("%s is fetching %s", d.Name, url), nil
} }
@ -67,11 +67,23 @@ func FetchResults(url string, fetchers []Fetcher, timeout time.Duration) ([]stri
chErr := make(chan error, len(fetchers)) chErr := make(chan error, len(fetchers))
for _, f := range fetchers { for _, f := range fetchers {
go func(f Fetcher) { go func(f Fetcher) {
ch := make(chan string, 1)
eCh := make(chan error, 1)
go func() {
s, err := f.Fetch(url) s, err := f.Fetch(url)
if err != nil { if err != nil {
chErr <- err eCh <- err
} else { } else {
chStr <- s ch <- s
}
}()
select {
case r := <-ch:
chStr <- r
case err := <-eCh:
chErr <- err
case <-time.After(timeout):
chErr <- fmt.Errorf("%s timeout after %v on %v", f.GetName(), timeout, url)
} }
}(f) }(f)
} }
@ -89,7 +101,7 @@ func FetchResults(url string, fetchers []Fetcher, timeout time.Duration) ([]stri
} }
func RunFetchers() { func RunFetchers() {
fetchers := []Fetcher{NewGoogleFetcher("Google"), NewGoogleFetcher("Bing"), NewGoogleFetcher("Duck Duck Go")} fetchers := []Fetcher{NewGoogleFetcher("Google"), NewBingFetcher("Bing"), NewDuckDuckGoFetcherFetcher("Duck Duck Go")}
r, e := FetchResults("http://www.abc.com", fetchers, time.Millisecond*100) r, e := FetchResults("http://www.abc.com", fetchers, time.Second*2)
spew.Dump(r, e) spew.Dump(r, e)
} }