diff --git a/concurrency/feedreader/main.go b/concurrency/feedreader/main.go new file mode 100644 index 0000000..6be5499 --- /dev/null +++ b/concurrency/feedreader/main.go @@ -0,0 +1,610 @@ +// +build OMIT + +// realmain runs the Subscribe example with a real RSS fetcher. +package main + +import ( + "fmt" + "math/rand" + "time" + + rss "github.com/jteeuwen/go-pkg-rss" +) + +// STARTITEM OMIT +// An Item is a stripped-down RSS item. +type Item struct{ Title, Channel, GUID string } + +// STOPITEM OMIT + +// STARTFETCHER OMIT +// A Fetcher fetches Items and returns the time when the next fetch should be +// attempted. On failure, Fetch returns a non-nil error. +type Fetcher interface { + Fetch() (items []Item, next time.Time, err error) +} + +// STOPFETCHER OMIT + +// STARTSUBSCRIPTION OMIT +// A Subscription delivers Items over a channel. Close cancels the +// subscription, closes the Updates channel, and returns the last fetch error, +// if any. +type Subscription interface { + Updates() <-chan Item + Close() error +} + +// STOPSUBSCRIPTION OMIT + +// STARTSUBSCRIBE OMIT +// Subscribe returns a new Subscription that uses fetcher to fetch Items. +func Subscribe(fetcher Fetcher) Subscription { + s := &sub{ + fetcher: fetcher, + updates: make(chan Item), // for Updates + closing: make(chan chan error), // for Close + } + go s.loop() + return s +} + +// STOPSUBSCRIBE OMIT + +// sub implements the Subscription interface. +type sub struct { + fetcher Fetcher // fetches items + updates chan Item // sends items to the user + closing chan chan error // for Close +} + +// STARTUPDATES OMIT +func (s *sub) Updates() <-chan Item { + return s.updates +} + +// STOPUPDATES OMIT + +// STARTCLOSE OMIT +// STARTCLOSESIG OMIT +func (s *sub) Close() error { + // STOPCLOSESIG OMIT + errc := make(chan error) + s.closing <- errc // HLchan + return <-errc // HLchan +} + +// STOPCLOSE OMIT + +// loopCloseOnly is a version of loop that includes only the logic +// that handles Close. +func (s *sub) loopCloseOnly() { + // STARTCLOSEONLY OMIT + var err error // set when Fetch fails + for { + select { + case errc := <-s.closing: // HLchan + errc <- err // HLchan + close(s.updates) // tells receiver we're done + return + } + } + // STOPCLOSEONLY OMIT +} + +// loopFetchOnly is a version of loop that includes only the logic +// that calls Fetch. +func (s *sub) loopFetchOnly() { + // STARTFETCHONLY OMIT + var pending []Item // appended by fetch; consumed by send + var next time.Time // initially January 1, year 0 + var err error + for { + var fetchDelay time.Duration // initally 0 (no delay) + if now := time.Now(); next.After(now) { + fetchDelay = next.Sub(now) + } + startFetch := time.After(fetchDelay) + + select { + case <-startFetch: + var fetched []Item + fetched, next, err = s.fetcher.Fetch() + if err != nil { + next = time.Now().Add(10 * time.Second) + break + } + pending = append(pending, fetched...) + } + } + // STOPFETCHONLY OMIT +} + +// loopSendOnly is a version of loop that includes only the logic for +// sending items to s.updates. +func (s *sub) loopSendOnly() { + // STARTSENDONLY OMIT + var pending []Item // appended by fetch; consumed by send + for { + var first Item + var updates chan Item // HLupdates + if len(pending) > 0 { + first = pending[0] + updates = s.updates // enable send case // HLupdates + } + + select { + case updates <- first: + pending = pending[1:] + } + } + // STOPSENDONLY OMIT +} + +// mergedLoop is a version of loop that combines loopCloseOnly, +// loopFetchOnly, and loopSendOnly. +func (s *sub) mergedLoop() { + // STARTFETCHVARS OMIT + var pending []Item + var next time.Time + var err error + // STOPFETCHVARS OMIT + for { + // STARTNOCAP OMIT + var fetchDelay time.Duration + if now := time.Now(); next.After(now) { + fetchDelay = next.Sub(now) + } + startFetch := time.After(fetchDelay) + // STOPNOCAP OMIT + var first Item + var updates chan Item + if len(pending) > 0 { + first = pending[0] + updates = s.updates // enable send case + } + + // STARTSELECT OMIT + select { + case errc := <-s.closing: // HLcases + errc <- err + close(s.updates) + return + // STARTFETCHCASE OMIT + case <-startFetch: // HLcases + var fetched []Item + fetched, next, err = s.fetcher.Fetch() // HLfetch + if err != nil { + next = time.Now().Add(10 * time.Second) + break + } + pending = append(pending, fetched...) // HLfetch + // STOPFETCHCASE OMIT + case updates <- first: // HLcases + pending = pending[1:] + } + // STOPSELECT OMIT + } +} + +// dedupeLoop extends mergedLoop with deduping of fetched items. +func (s *sub) dedupeLoop() { + const maxPending = 10 + // STARTSEEN OMIT + var pending []Item + var next time.Time + var err error + var seen = make(map[string]bool) // set of item.GUIDs // HLseen + // STOPSEEN OMIT + for { + // STARTCAP OMIT + var fetchDelay time.Duration + if now := time.Now(); next.After(now) { + fetchDelay = next.Sub(now) + } + var startFetch <-chan time.Time // HLcap + if len(pending) < maxPending { // HLcap + startFetch = time.After(fetchDelay) // enable fetch case // HLcap + } // HLcap + // STOPCAP OMIT + var first Item + var updates chan Item + if len(pending) > 0 { + first = pending[0] + updates = s.updates // enable send case + } + select { + case errc := <-s.closing: + errc <- err + close(s.updates) + return + // STARTDEDUPE OMIT + case <-startFetch: + var fetched []Item + fetched, next, err = s.fetcher.Fetch() // HLfetch + if err != nil { + next = time.Now().Add(10 * time.Second) + break + } + for _, item := range fetched { + if !seen[item.GUID] { // HLdupe + pending = append(pending, item) // HLdupe + seen[item.GUID] = true // HLdupe + } // HLdupe + } + // STOPDEDUPE OMIT + case updates <- first: + pending = pending[1:] + } + } +} + +// loop periodically fecthes Items, sends them on s.updates, and exits +// when Close is called. It extends dedupeLoop with logic to run +// Fetch asynchronously. +func (s *sub) loop() { + const maxPending = 10 + type fetchResult struct { + fetched []Item + next time.Time + err error + } + // STARTFETCHDONE OMIT + var fetchDone chan fetchResult // if non-nil, Fetch is running // HL + // STOPFETCHDONE OMIT + var pending []Item + var next time.Time + var err error + var seen = make(map[string]bool) + for { + var fetchDelay time.Duration + if now := time.Now(); next.After(now) { + fetchDelay = next.Sub(now) + } + // STARTFETCHIF OMIT + var startFetch <-chan time.Time + if fetchDone == nil && len(pending) < maxPending { // HLfetch + startFetch = time.After(fetchDelay) // enable fetch case + } + // STOPFETCHIF OMIT + var first Item + var updates chan Item + if len(pending) > 0 { + first = pending[0] + updates = s.updates // enable send case + } + // STARTFETCHASYNC OMIT + select { + case <-startFetch: // HLfetch + fetchDone = make(chan fetchResult, 1) // HLfetch + go func() { + fetched, next, err := s.fetcher.Fetch() + fetchDone <- fetchResult{fetched, next, err} + }() + case result := <-fetchDone: // HLfetch + fetchDone = nil // HLfetch + // Use result.fetched, result.next, result.err + // STOPFETCHASYNC OMIT + fetched := result.fetched + next, err = result.next, result.err + if err != nil { + next = time.Now().Add(10 * time.Second) + break + } + for _, item := range fetched { + if id := item.GUID; !seen[id] { // HLdupe + pending = append(pending, item) + seen[id] = true // HLdupe + } + } + case errc := <-s.closing: + errc <- err + close(s.updates) + return + case updates <- first: + pending = pending[1:] + } + } +} + +// naiveMerge is a version of Merge that doesn't quite work right. In +// particular, the goroutines it starts may block forever on m.updates +// if the receiver stops receiving. +type naiveMerge struct { + subs []Subscription + updates chan Item +} + +// STARTNAIVEMERGE OMIT +func NaiveMerge(subs ...Subscription) Subscription { + m := &naiveMerge{ + subs: subs, + updates: make(chan Item), + } + // STARTNAIVEMERGELOOP OMIT + for _, sub := range subs { + go func(s Subscription) { + for it := range s.Updates() { + m.updates <- it // HL + } + }(sub) + } + // STOPNAIVEMERGELOOP OMIT + return m +} + +// STOPNAIVEMERGE OMIT + +// STARTNAIVEMERGECLOSE OMIT +func (m *naiveMerge) Close() (err error) { + for _, sub := range m.subs { + if e := sub.Close(); err == nil && e != nil { + err = e + } + } + close(m.updates) // HL + return +} + +// STOPNAIVEMERGECLOSE OMIT + +func (m *naiveMerge) Updates() <-chan Item { + return m.updates +} + +type merge struct { + subs []Subscription + updates chan Item + quit chan struct{} + errs chan error +} + +// STARTMERGESIG OMIT +// Merge returns a Subscription that merges the item streams from subs. +// Closing the merged subscription closes subs. +func Merge(subs ...Subscription) Subscription { + // STOPMERGESIG OMIT + m := &merge{ + subs: subs, + updates: make(chan Item), + quit: make(chan struct{}), + errs: make(chan error), + } + // STARTMERGE OMIT + for _, sub := range subs { + go func(s Subscription) { + for { + var it Item + select { + case it = <-s.Updates(): + case <-m.quit: // HL + m.errs <- s.Close() // HL + return // HL + } + select { + case m.updates <- it: + case <-m.quit: // HL + m.errs <- s.Close() // HL + return // HL + } + } + }(sub) + } + // STOPMERGE OMIT + return m +} + +func (m *merge) Updates() <-chan Item { + return m.updates +} + +// STARTMERGECLOSE OMIT +func (m *merge) Close() (err error) { + close(m.quit) // HL + for _ = range m.subs { + if e := <-m.errs; e != nil { // HL + err = e + } + } + close(m.updates) // HL + return +} + +// STOPMERGECLOSE OMIT + +// NaiveDedupe converts a stream of Items that may contain duplicates +// into one that doesn't. +func NaiveDedupe(in <-chan Item) <-chan Item { + out := make(chan Item) + go func() { + seen := make(map[string]bool) + for it := range in { + if !seen[it.GUID] { + // BUG: this send blocks if the + // receiver closes the Subscription + // and stops receiving. + out <- it // HL + seen[it.GUID] = true + } + } + close(out) + }() + return out +} + +type deduper struct { + s Subscription + updates chan Item + closing chan chan error +} + +// Dedupe converts a Subscription that may send duplicate Items into +// one that doesn't. +func Dedupe(s Subscription) Subscription { + d := &deduper{ + s: s, + updates: make(chan Item), + closing: make(chan chan error), + } + go d.loop() + return d +} + +func (d *deduper) loop() { + in := d.s.Updates() // enable receive + var pending Item + var out chan Item // disable send + seen := make(map[string]bool) + for { + select { + case it := <-in: + if !seen[it.GUID] { + pending = it + in = nil // disable receive + out = d.updates // enable send + seen[it.GUID] = true + } + case out <- pending: + in = d.s.Updates() // enable receive + out = nil // disable send + case errc := <-d.closing: + err := d.s.Close() + errc <- err + close(d.updates) + return + } + } +} + +func (d *deduper) Close() error { + errc := make(chan error) + d.closing <- errc + return <-errc +} + +func (d *deduper) Updates() <-chan Item { + return d.updates +} + +// FakeFetch causes Fetch to use a fake fetcher instead of the real +// one. +var FakeFetch bool + +// Fetch returns a Fetcher for Items from domain. +func Fetch(domain string) Fetcher { + if FakeFetch { + return fakeFetch(domain) + } + return realFetch(domain) +} + +func fakeFetch(domain string) Fetcher { + return &fakeFetcher{channel: domain} +} + +type fakeFetcher struct { + channel string + items []Item +} + +// FakeDuplicates causes the fake fetcher to return duplicate items. +var FakeDuplicates bool + +func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { + now := time.Now() + next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) + item := Item{ + Channel: f.channel, + Title: fmt.Sprintf("Item %d", len(f.items)), + } + item.GUID = item.Channel + "/" + item.Title + f.items = append(f.items, item) + if FakeDuplicates { + items = f.items + } else { + items = []Item{item} + } + return +} + +// realFetch returns a fetcher for the specified blogger domain. +func realFetch(domain string) Fetcher { + return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain)) +} + +type fetcher struct { + uri string + feed *rss.Feed + items []Item +} + +// NewFetcher returns a Fetcher for uri. +func NewFetcher(uri string) Fetcher { + f := &fetcher{ + uri: uri, + } + newChans := func(feed *rss.Feed, chans []*rss.Channel) {} + newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) { + for _, it := range items { + f.items = append(f.items, Item{ + Channel: ch.Title, + GUID: it.Guid, + Title: it.Title, + }) + } + } + f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems) + return f +} + +func (f *fetcher) Fetch() (items []Item, next time.Time, err error) { + fmt.Println("fetching", f.uri) + if err = f.feed.Fetch(f.uri, nil); err != nil { + return + } + items = f.items + f.items = nil + next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second) + return +} + +// TODO: in a longer talk: move the Subscribe function onto a Reader type, to +// support dynamically adding and removing Subscriptions. Reader should dedupe. + +// TODO: in a longer talk: make successive Subscribe calls for the same uri +// share the same underlying Subscription, but provide duplicate streams. + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// STARTMAIN OMIT +func main() { + // STARTMERGECALL OMIT + // Subscribe to some feeds, and create a merged update stream. + merged := Merge( + Subscribe(Fetch("blog.golang.org")), + Subscribe(Fetch("googleblog.blogspot.com")), + Subscribe(Fetch("googledevelopers.blogspot.com"))) + // STOPMERGECALL OMIT + + // Close the subscriptions after some time. + time.AfterFunc(3*time.Second, func() { + fmt.Println("closed:", merged.Close()) + }) + + // Print the stream. + for it := range merged.Updates() { + fmt.Println(it.Channel, it.Title) + } + + // Uncomment the panic below to dump the stack traces. This + // will show several stacks for persistent HTTP connections + // created by the real RSS client. To clean these up, we'll + // need to extend Fetcher with a Close method and plumb this + // through the RSS client implementation. + // + // panic("show me the stacks") +} + +// STOPMAIN OMIT diff --git a/concurrency/waitgroup/main.go b/concurrency/waitgroup/main.go new file mode 100644 index 0000000..8edd7a1 --- /dev/null +++ b/concurrency/waitgroup/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "sync" +) + +var wg sync.WaitGroup + +func main() { + + for i := 0; i <= 10; i++ { + go print(i) + wg.Add(1) + } + wg.Wait() +} + +func looprint(count int) { + for i := 0; i <= count; i++ { + fmt.Println(i) + } +} + +func print(i int) { + fmt.Println(i) + wg.Done() +}