mirror of
https://github.com/crazybber/awesome-patterns.git
synced 2024-11-25 06:16:03 +03:00
611 lines
14 KiB
Go
611 lines
14 KiB
Go
// +build OMIT
|
|
|
|
// realmain runs the Subscribe example with a real RSS fetcher.
|
|
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
|
|
rss "github.com/jteeuwen/go-pkg-rss"
|
|
)
|
|
|
|
// STARTITEM OMIT
|
|
// An Item is a stripped-down RSS item.
|
|
type Item struct{ Title, Channel, GUID string }
|
|
|
|
// STOPITEM OMIT
|
|
|
|
// STARTFETCHER OMIT
|
|
// A Fetcher fetches Items and returns the time when the next fetch should be
|
|
// attempted. On failure, Fetch returns a non-nil error.
|
|
type Fetcher interface {
|
|
Fetch() (items []Item, next time.Time, err error)
|
|
}
|
|
|
|
// STOPFETCHER OMIT
|
|
|
|
// STARTSUBSCRIPTION OMIT
|
|
// A Subscription delivers Items over a channel. Close cancels the
|
|
// subscription, closes the Updates channel, and returns the last fetch error,
|
|
// if any.
|
|
type Subscription interface {
|
|
Updates() <-chan Item
|
|
Close() error
|
|
}
|
|
|
|
// STOPSUBSCRIPTION OMIT
|
|
|
|
// STARTSUBSCRIBE OMIT
|
|
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
|
|
func Subscribe(fetcher Fetcher) Subscription {
|
|
s := &sub{
|
|
fetcher: fetcher,
|
|
updates: make(chan Item), // for Updates
|
|
closing: make(chan chan error), // for Close
|
|
}
|
|
go s.loop()
|
|
return s
|
|
}
|
|
|
|
// STOPSUBSCRIBE OMIT
|
|
|
|
// sub implements the Subscription interface.
|
|
type sub struct {
|
|
fetcher Fetcher // fetches items
|
|
updates chan Item // sends items to the user
|
|
closing chan chan error // for Close
|
|
}
|
|
|
|
// STARTUPDATES OMIT
|
|
func (s *sub) Updates() <-chan Item {
|
|
return s.updates
|
|
}
|
|
|
|
// STOPUPDATES OMIT
|
|
|
|
// STARTCLOSE OMIT
|
|
// STARTCLOSESIG OMIT
|
|
func (s *sub) Close() error {
|
|
// STOPCLOSESIG OMIT
|
|
errc := make(chan error)
|
|
s.closing <- errc // HLchan
|
|
return <-errc // HLchan
|
|
}
|
|
|
|
// STOPCLOSE OMIT
|
|
|
|
// loopCloseOnly is a version of loop that includes only the logic
|
|
// that handles Close.
|
|
func (s *sub) loopCloseOnly() {
|
|
// STARTCLOSEONLY OMIT
|
|
var err error // set when Fetch fails
|
|
for {
|
|
select {
|
|
case errc := <-s.closing: // HLchan
|
|
errc <- err // HLchan
|
|
close(s.updates) // tells receiver we're done
|
|
return
|
|
}
|
|
}
|
|
// STOPCLOSEONLY OMIT
|
|
}
|
|
|
|
// loopFetchOnly is a version of loop that includes only the logic
|
|
// that calls Fetch.
|
|
func (s *sub) loopFetchOnly() {
|
|
// STARTFETCHONLY OMIT
|
|
var pending []Item // appended by fetch; consumed by send
|
|
var next time.Time // initially January 1, year 0
|
|
var err error
|
|
for {
|
|
var fetchDelay time.Duration // initally 0 (no delay)
|
|
if now := time.Now(); next.After(now) {
|
|
fetchDelay = next.Sub(now)
|
|
}
|
|
startFetch := time.After(fetchDelay)
|
|
|
|
select {
|
|
case <-startFetch:
|
|
var fetched []Item
|
|
fetched, next, err = s.fetcher.Fetch()
|
|
if err != nil {
|
|
next = time.Now().Add(10 * time.Second)
|
|
break
|
|
}
|
|
pending = append(pending, fetched...)
|
|
}
|
|
}
|
|
// STOPFETCHONLY OMIT
|
|
}
|
|
|
|
// loopSendOnly is a version of loop that includes only the logic for
|
|
// sending items to s.updates.
|
|
func (s *sub) loopSendOnly() {
|
|
// STARTSENDONLY OMIT
|
|
var pending []Item // appended by fetch; consumed by send
|
|
for {
|
|
var first Item
|
|
var updates chan Item // HLupdates
|
|
if len(pending) > 0 {
|
|
first = pending[0]
|
|
updates = s.updates // enable send case // HLupdates
|
|
}
|
|
|
|
select {
|
|
case updates <- first:
|
|
pending = pending[1:]
|
|
}
|
|
}
|
|
// STOPSENDONLY OMIT
|
|
}
|
|
|
|
// mergedLoop is a version of loop that combines loopCloseOnly,
|
|
// loopFetchOnly, and loopSendOnly.
|
|
func (s *sub) mergedLoop() {
|
|
// STARTFETCHVARS OMIT
|
|
var pending []Item
|
|
var next time.Time
|
|
var err error
|
|
// STOPFETCHVARS OMIT
|
|
for {
|
|
// STARTNOCAP OMIT
|
|
var fetchDelay time.Duration
|
|
if now := time.Now(); next.After(now) {
|
|
fetchDelay = next.Sub(now)
|
|
}
|
|
startFetch := time.After(fetchDelay)
|
|
// STOPNOCAP OMIT
|
|
var first Item
|
|
var updates chan Item
|
|
if len(pending) > 0 {
|
|
first = pending[0]
|
|
updates = s.updates // enable send case
|
|
}
|
|
|
|
// STARTSELECT OMIT
|
|
select {
|
|
case errc := <-s.closing: // HLcases
|
|
errc <- err
|
|
close(s.updates)
|
|
return
|
|
// STARTFETCHCASE OMIT
|
|
case <-startFetch: // HLcases
|
|
var fetched []Item
|
|
fetched, next, err = s.fetcher.Fetch() // HLfetch
|
|
if err != nil {
|
|
next = time.Now().Add(10 * time.Second)
|
|
break
|
|
}
|
|
pending = append(pending, fetched...) // HLfetch
|
|
// STOPFETCHCASE OMIT
|
|
case updates <- first: // HLcases
|
|
pending = pending[1:]
|
|
}
|
|
// STOPSELECT OMIT
|
|
}
|
|
}
|
|
|
|
// dedupeLoop extends mergedLoop with deduping of fetched items.
|
|
func (s *sub) dedupeLoop() {
|
|
const maxPending = 10
|
|
// STARTSEEN OMIT
|
|
var pending []Item
|
|
var next time.Time
|
|
var err error
|
|
var seen = make(map[string]bool) // set of item.GUIDs // HLseen
|
|
// STOPSEEN OMIT
|
|
for {
|
|
// STARTCAP OMIT
|
|
var fetchDelay time.Duration
|
|
if now := time.Now(); next.After(now) {
|
|
fetchDelay = next.Sub(now)
|
|
}
|
|
var startFetch <-chan time.Time // HLcap
|
|
if len(pending) < maxPending { // HLcap
|
|
startFetch = time.After(fetchDelay) // enable fetch case // HLcap
|
|
} // HLcap
|
|
// STOPCAP OMIT
|
|
var first Item
|
|
var updates chan Item
|
|
if len(pending) > 0 {
|
|
first = pending[0]
|
|
updates = s.updates // enable send case
|
|
}
|
|
select {
|
|
case errc := <-s.closing:
|
|
errc <- err
|
|
close(s.updates)
|
|
return
|
|
// STARTDEDUPE OMIT
|
|
case <-startFetch:
|
|
var fetched []Item
|
|
fetched, next, err = s.fetcher.Fetch() // HLfetch
|
|
if err != nil {
|
|
next = time.Now().Add(10 * time.Second)
|
|
break
|
|
}
|
|
for _, item := range fetched {
|
|
if !seen[item.GUID] { // HLdupe
|
|
pending = append(pending, item) // HLdupe
|
|
seen[item.GUID] = true // HLdupe
|
|
} // HLdupe
|
|
}
|
|
// STOPDEDUPE OMIT
|
|
case updates <- first:
|
|
pending = pending[1:]
|
|
}
|
|
}
|
|
}
|
|
|
|
// loop periodically fecthes Items, sends them on s.updates, and exits
|
|
// when Close is called. It extends dedupeLoop with logic to run
|
|
// Fetch asynchronously.
|
|
func (s *sub) loop() {
|
|
const maxPending = 10
|
|
type fetchResult struct {
|
|
fetched []Item
|
|
next time.Time
|
|
err error
|
|
}
|
|
// STARTFETCHDONE OMIT
|
|
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
|
|
// STOPFETCHDONE OMIT
|
|
var pending []Item
|
|
var next time.Time
|
|
var err error
|
|
var seen = make(map[string]bool)
|
|
for {
|
|
var fetchDelay time.Duration
|
|
if now := time.Now(); next.After(now) {
|
|
fetchDelay = next.Sub(now)
|
|
}
|
|
// STARTFETCHIF OMIT
|
|
var startFetch <-chan time.Time
|
|
if fetchDone == nil && len(pending) < maxPending { // HLfetch
|
|
startFetch = time.After(fetchDelay) // enable fetch case
|
|
}
|
|
// STOPFETCHIF OMIT
|
|
var first Item
|
|
var updates chan Item
|
|
if len(pending) > 0 {
|
|
first = pending[0]
|
|
updates = s.updates // enable send case
|
|
}
|
|
// STARTFETCHASYNC OMIT
|
|
select {
|
|
case <-startFetch: // HLfetch
|
|
fetchDone = make(chan fetchResult, 1) // HLfetch
|
|
go func() {
|
|
fetched, next, err := s.fetcher.Fetch()
|
|
fetchDone <- fetchResult{fetched, next, err}
|
|
}()
|
|
case result := <-fetchDone: // HLfetch
|
|
fetchDone = nil // HLfetch
|
|
// Use result.fetched, result.next, result.err
|
|
// STOPFETCHASYNC OMIT
|
|
fetched := result.fetched
|
|
next, err = result.next, result.err
|
|
if err != nil {
|
|
next = time.Now().Add(10 * time.Second)
|
|
break
|
|
}
|
|
for _, item := range fetched {
|
|
if id := item.GUID; !seen[id] { // HLdupe
|
|
pending = append(pending, item)
|
|
seen[id] = true // HLdupe
|
|
}
|
|
}
|
|
case errc := <-s.closing:
|
|
errc <- err
|
|
close(s.updates)
|
|
return
|
|
case updates <- first:
|
|
pending = pending[1:]
|
|
}
|
|
}
|
|
}
|
|
|
|
// naiveMerge is a version of Merge that doesn't quite work right. In
|
|
// particular, the goroutines it starts may block forever on m.updates
|
|
// if the receiver stops receiving.
|
|
type naiveMerge struct {
|
|
subs []Subscription
|
|
updates chan Item
|
|
}
|
|
|
|
// STARTNAIVEMERGE OMIT
|
|
func NaiveMerge(subs ...Subscription) Subscription {
|
|
m := &naiveMerge{
|
|
subs: subs,
|
|
updates: make(chan Item),
|
|
}
|
|
// STARTNAIVEMERGELOOP OMIT
|
|
for _, sub := range subs {
|
|
go func(s Subscription) {
|
|
for it := range s.Updates() {
|
|
m.updates <- it // HL
|
|
}
|
|
}(sub)
|
|
}
|
|
// STOPNAIVEMERGELOOP OMIT
|
|
return m
|
|
}
|
|
|
|
// STOPNAIVEMERGE OMIT
|
|
|
|
// STARTNAIVEMERGECLOSE OMIT
|
|
func (m *naiveMerge) Close() (err error) {
|
|
for _, sub := range m.subs {
|
|
if e := sub.Close(); err == nil && e != nil {
|
|
err = e
|
|
}
|
|
}
|
|
close(m.updates) // HL
|
|
return
|
|
}
|
|
|
|
// STOPNAIVEMERGECLOSE OMIT
|
|
|
|
func (m *naiveMerge) Updates() <-chan Item {
|
|
return m.updates
|
|
}
|
|
|
|
type merge struct {
|
|
subs []Subscription
|
|
updates chan Item
|
|
quit chan struct{}
|
|
errs chan error
|
|
}
|
|
|
|
// STARTMERGESIG OMIT
|
|
// Merge returns a Subscription that merges the item streams from subs.
|
|
// Closing the merged subscription closes subs.
|
|
func Merge(subs ...Subscription) Subscription {
|
|
// STOPMERGESIG OMIT
|
|
m := &merge{
|
|
subs: subs,
|
|
updates: make(chan Item),
|
|
quit: make(chan struct{}),
|
|
errs: make(chan error),
|
|
}
|
|
// STARTMERGE OMIT
|
|
for _, sub := range subs {
|
|
go func(s Subscription) {
|
|
for {
|
|
var it Item
|
|
select {
|
|
case it = <-s.Updates():
|
|
case <-m.quit: // HL
|
|
m.errs <- s.Close() // HL
|
|
return // HL
|
|
}
|
|
select {
|
|
case m.updates <- it:
|
|
case <-m.quit: // HL
|
|
m.errs <- s.Close() // HL
|
|
return // HL
|
|
}
|
|
}
|
|
}(sub)
|
|
}
|
|
// STOPMERGE OMIT
|
|
return m
|
|
}
|
|
|
|
func (m *merge) Updates() <-chan Item {
|
|
return m.updates
|
|
}
|
|
|
|
// STARTMERGECLOSE OMIT
|
|
func (m *merge) Close() (err error) {
|
|
close(m.quit) // HL
|
|
for _ = range m.subs {
|
|
if e := <-m.errs; e != nil { // HL
|
|
err = e
|
|
}
|
|
}
|
|
close(m.updates) // HL
|
|
return
|
|
}
|
|
|
|
// STOPMERGECLOSE OMIT
|
|
|
|
// NaiveDedupe converts a stream of Items that may contain duplicates
|
|
// into one that doesn't.
|
|
func NaiveDedupe(in <-chan Item) <-chan Item {
|
|
out := make(chan Item)
|
|
go func() {
|
|
seen := make(map[string]bool)
|
|
for it := range in {
|
|
if !seen[it.GUID] {
|
|
// BUG: this send blocks if the
|
|
// receiver closes the Subscription
|
|
// and stops receiving.
|
|
out <- it // HL
|
|
seen[it.GUID] = true
|
|
}
|
|
}
|
|
close(out)
|
|
}()
|
|
return out
|
|
}
|
|
|
|
type deduper struct {
|
|
s Subscription
|
|
updates chan Item
|
|
closing chan chan error
|
|
}
|
|
|
|
// Dedupe converts a Subscription that may send duplicate Items into
|
|
// one that doesn't.
|
|
func Dedupe(s Subscription) Subscription {
|
|
d := &deduper{
|
|
s: s,
|
|
updates: make(chan Item),
|
|
closing: make(chan chan error),
|
|
}
|
|
go d.loop()
|
|
return d
|
|
}
|
|
|
|
func (d *deduper) loop() {
|
|
in := d.s.Updates() // enable receive
|
|
var pending Item
|
|
var out chan Item // disable send
|
|
seen := make(map[string]bool)
|
|
for {
|
|
select {
|
|
case it := <-in:
|
|
if !seen[it.GUID] {
|
|
pending = it
|
|
in = nil // disable receive
|
|
out = d.updates // enable send
|
|
seen[it.GUID] = true
|
|
}
|
|
case out <- pending:
|
|
in = d.s.Updates() // enable receive
|
|
out = nil // disable send
|
|
case errc := <-d.closing:
|
|
err := d.s.Close()
|
|
errc <- err
|
|
close(d.updates)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *deduper) Close() error {
|
|
errc := make(chan error)
|
|
d.closing <- errc
|
|
return <-errc
|
|
}
|
|
|
|
func (d *deduper) Updates() <-chan Item {
|
|
return d.updates
|
|
}
|
|
|
|
// FakeFetch causes Fetch to use a fake fetcher instead of the real
|
|
// one.
|
|
var FakeFetch bool
|
|
|
|
// Fetch returns a Fetcher for Items from domain.
|
|
func Fetch(domain string) Fetcher {
|
|
if FakeFetch {
|
|
return fakeFetch(domain)
|
|
}
|
|
return realFetch(domain)
|
|
}
|
|
|
|
func fakeFetch(domain string) Fetcher {
|
|
return &fakeFetcher{channel: domain}
|
|
}
|
|
|
|
type fakeFetcher struct {
|
|
channel string
|
|
items []Item
|
|
}
|
|
|
|
// FakeDuplicates causes the fake fetcher to return duplicate items.
|
|
var FakeDuplicates bool
|
|
|
|
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
|
|
now := time.Now()
|
|
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
|
|
item := Item{
|
|
Channel: f.channel,
|
|
Title: fmt.Sprintf("Item %d", len(f.items)),
|
|
}
|
|
item.GUID = item.Channel + "/" + item.Title
|
|
f.items = append(f.items, item)
|
|
if FakeDuplicates {
|
|
items = f.items
|
|
} else {
|
|
items = []Item{item}
|
|
}
|
|
return
|
|
}
|
|
|
|
// realFetch returns a fetcher for the specified blogger domain.
|
|
func realFetch(domain string) Fetcher {
|
|
return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain))
|
|
}
|
|
|
|
type fetcher struct {
|
|
uri string
|
|
feed *rss.Feed
|
|
items []Item
|
|
}
|
|
|
|
// NewFetcher returns a Fetcher for uri.
|
|
func NewFetcher(uri string) Fetcher {
|
|
f := &fetcher{
|
|
uri: uri,
|
|
}
|
|
newChans := func(feed *rss.Feed, chans []*rss.Channel) {}
|
|
newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) {
|
|
for _, it := range items {
|
|
f.items = append(f.items, Item{
|
|
Channel: ch.Title,
|
|
GUID: it.Guid,
|
|
Title: it.Title,
|
|
})
|
|
}
|
|
}
|
|
f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems)
|
|
return f
|
|
}
|
|
|
|
func (f *fetcher) Fetch() (items []Item, next time.Time, err error) {
|
|
fmt.Println("fetching", f.uri)
|
|
if err = f.feed.Fetch(f.uri, nil); err != nil {
|
|
return
|
|
}
|
|
items = f.items
|
|
f.items = nil
|
|
next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second)
|
|
return
|
|
}
|
|
|
|
// TODO: in a longer talk: move the Subscribe function onto a Reader type, to
|
|
// support dynamically adding and removing Subscriptions. Reader should dedupe.
|
|
|
|
// TODO: in a longer talk: make successive Subscribe calls for the same uri
|
|
// share the same underlying Subscription, but provide duplicate streams.
|
|
|
|
func init() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
}
|
|
|
|
// STARTMAIN OMIT
|
|
func main() {
|
|
// STARTMERGECALL OMIT
|
|
// Subscribe to some feeds, and create a merged update stream.
|
|
merged := Merge(
|
|
Subscribe(Fetch("blog.golang.org")),
|
|
Subscribe(Fetch("googleblog.blogspot.com")),
|
|
Subscribe(Fetch("googledevelopers.blogspot.com")))
|
|
// STOPMERGECALL OMIT
|
|
|
|
// Close the subscriptions after some time.
|
|
time.AfterFunc(3*time.Second, func() {
|
|
fmt.Println("closed:", merged.Close())
|
|
})
|
|
|
|
// Print the stream.
|
|
for it := range merged.Updates() {
|
|
fmt.Println(it.Channel, it.Title)
|
|
}
|
|
|
|
// Uncomment the panic below to dump the stack traces. This
|
|
// will show several stacks for persistent HTTP connections
|
|
// created by the real RSS client. To clean these up, we'll
|
|
// need to extend Fetcher with a Close method and plumb this
|
|
// through the RSS client implementation.
|
|
//
|
|
// panic("show me the stacks")
|
|
}
|
|
|
|
// STOPMAIN OMIT
|