// +build OMIT // realmain runs the Subscribe example with a real RSS fetcher. package main import ( "fmt" "math/rand" "time" rss "github.com/jteeuwen/go-pkg-rss" ) // STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string } // STOPITEM OMIT // STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) } // STOPFETCHER OMIT // STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error } // STOPSUBSCRIPTION OMIT // STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s } // STOPSUBSCRIBE OMIT // sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close } // STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates } // STOPUPDATES OMIT // STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan } // STOPCLOSE OMIT // loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT } // loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initally 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) select { case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...) } } // STOPFETCHONLY OMIT } // loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates } select { case updates <- first: pending = pending[1:] } } // STOPSENDONLY OMIT } // mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTSELECT OMIT select { case errc := <-s.closing: // HLcases errc <- err close(s.updates) return // STARTFETCHCASE OMIT case <-startFetch: // HLcases var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...) // HLfetch // STOPFETCHCASE OMIT case updates <- first: // HLcases pending = pending[1:] } // STOPSELECT OMIT } } // dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } } // loop periodically fecthes Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } } // naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item } // STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m } // STOPNAIVEMERGE OMIT // STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return } // STOPNAIVEMERGECLOSE OMIT func (m *naiveMerge) Updates() <-chan Item { return m.updates } type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error } // STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m } func (m *merge) Updates() <-chan Item { return m.updates } // STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return } // STOPMERGECLOSE OMIT // NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out } type deduper struct { s Subscription updates chan Item closing chan chan error } // Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d } func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } } func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc } func (d *deduper) Updates() <-chan Item { return d.updates } // FakeFetch causes Fetch to use a fake fetcher instead of the real // one. var FakeFetch bool // Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { if FakeFetch { return fakeFetch(domain) } return realFetch(domain) } func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} } type fakeFetcher struct { channel string items []Item } // FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return } // realFetch returns a fetcher for the specified blogger domain. func realFetch(domain string) Fetcher { return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain)) } type fetcher struct { uri string feed *rss.Feed items []Item } // NewFetcher returns a Fetcher for uri. func NewFetcher(uri string) Fetcher { f := &fetcher{ uri: uri, } newChans := func(feed *rss.Feed, chans []*rss.Channel) {} newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) { for _, it := range items { f.items = append(f.items, Item{ Channel: ch.Title, GUID: it.Guid, Title: it.Title, }) } } f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems) return f } func (f *fetcher) Fetch() (items []Item, next time.Time, err error) { fmt.Println("fetching", f.uri) if err = f.feed.Fetch(f.uri, nil); err != nil { return } items = f.items f.items = nil next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second) return } // TODO: in a longer talk: move the Subscribe function onto a Reader type, to // support dynamically adding and removing Subscriptions. Reader should dedupe. // TODO: in a longer talk: make successive Subscribe calls for the same uri // share the same underlying Subscription, but provide duplicate streams. func init() { rand.Seed(time.Now().UnixNano()) } // STARTMAIN OMIT func main() { // STARTMERGECALL OMIT // Subscribe to some feeds, and create a merged update stream. merged := Merge( Subscribe(Fetch("blog.golang.org")), Subscribe(Fetch("googleblog.blogspot.com")), Subscribe(Fetch("googledevelopers.blogspot.com"))) // STOPMERGECALL OMIT // Close the subscriptions after some time. time.AfterFunc(3*time.Second, func() { fmt.Println("closed:", merged.Close()) }) // Print the stream. for it := range merged.Updates() { fmt.Println(it.Channel, it.Title) } // Uncomment the panic below to dump the stack traces. This // will show several stacks for persistent HTTP connections // created by the real RSS client. To clean these up, we'll // need to extend Fetcher with a Close method and plumb this // through the RSS client implementation. // // panic("show me the stacks") } // STOPMAIN OMIT