mirror of
https://github.com/crazybber/awesome-patterns.git
synced 2024-11-22 04:36:02 +03:00
added wait group
This commit is contained in:
parent
1d6b444d01
commit
a216296557
610
concurrency/feedreader/main.go
Normal file
610
concurrency/feedreader/main.go
Normal file
@ -0,0 +1,610 @@
|
||||
// +build OMIT
|
||||
|
||||
// realmain runs the Subscribe example with a real RSS fetcher.
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
rss "github.com/jteeuwen/go-pkg-rss"
|
||||
)
|
||||
|
||||
// STARTITEM OMIT
|
||||
// An Item is a stripped-down RSS item.
|
||||
type Item struct{ Title, Channel, GUID string }
|
||||
|
||||
// STOPITEM OMIT
|
||||
|
||||
// STARTFETCHER OMIT
|
||||
// A Fetcher fetches Items and returns the time when the next fetch should be
|
||||
// attempted. On failure, Fetch returns a non-nil error.
|
||||
type Fetcher interface {
|
||||
Fetch() (items []Item, next time.Time, err error)
|
||||
}
|
||||
|
||||
// STOPFETCHER OMIT
|
||||
|
||||
// STARTSUBSCRIPTION OMIT
|
||||
// A Subscription delivers Items over a channel. Close cancels the
|
||||
// subscription, closes the Updates channel, and returns the last fetch error,
|
||||
// if any.
|
||||
type Subscription interface {
|
||||
Updates() <-chan Item
|
||||
Close() error
|
||||
}
|
||||
|
||||
// STOPSUBSCRIPTION OMIT
|
||||
|
||||
// STARTSUBSCRIBE OMIT
|
||||
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
|
||||
func Subscribe(fetcher Fetcher) Subscription {
|
||||
s := &sub{
|
||||
fetcher: fetcher,
|
||||
updates: make(chan Item), // for Updates
|
||||
closing: make(chan chan error), // for Close
|
||||
}
|
||||
go s.loop()
|
||||
return s
|
||||
}
|
||||
|
||||
// STOPSUBSCRIBE OMIT
|
||||
|
||||
// sub implements the Subscription interface.
|
||||
type sub struct {
|
||||
fetcher Fetcher // fetches items
|
||||
updates chan Item // sends items to the user
|
||||
closing chan chan error // for Close
|
||||
}
|
||||
|
||||
// STARTUPDATES OMIT
|
||||
func (s *sub) Updates() <-chan Item {
|
||||
return s.updates
|
||||
}
|
||||
|
||||
// STOPUPDATES OMIT
|
||||
|
||||
// STARTCLOSE OMIT
|
||||
// STARTCLOSESIG OMIT
|
||||
func (s *sub) Close() error {
|
||||
// STOPCLOSESIG OMIT
|
||||
errc := make(chan error)
|
||||
s.closing <- errc // HLchan
|
||||
return <-errc // HLchan
|
||||
}
|
||||
|
||||
// STOPCLOSE OMIT
|
||||
|
||||
// loopCloseOnly is a version of loop that includes only the logic
|
||||
// that handles Close.
|
||||
func (s *sub) loopCloseOnly() {
|
||||
// STARTCLOSEONLY OMIT
|
||||
var err error // set when Fetch fails
|
||||
for {
|
||||
select {
|
||||
case errc := <-s.closing: // HLchan
|
||||
errc <- err // HLchan
|
||||
close(s.updates) // tells receiver we're done
|
||||
return
|
||||
}
|
||||
}
|
||||
// STOPCLOSEONLY OMIT
|
||||
}
|
||||
|
||||
// loopFetchOnly is a version of loop that includes only the logic
|
||||
// that calls Fetch.
|
||||
func (s *sub) loopFetchOnly() {
|
||||
// STARTFETCHONLY OMIT
|
||||
var pending []Item // appended by fetch; consumed by send
|
||||
var next time.Time // initially January 1, year 0
|
||||
var err error
|
||||
for {
|
||||
var fetchDelay time.Duration // initally 0 (no delay)
|
||||
if now := time.Now(); next.After(now) {
|
||||
fetchDelay = next.Sub(now)
|
||||
}
|
||||
startFetch := time.After(fetchDelay)
|
||||
|
||||
select {
|
||||
case <-startFetch:
|
||||
var fetched []Item
|
||||
fetched, next, err = s.fetcher.Fetch()
|
||||
if err != nil {
|
||||
next = time.Now().Add(10 * time.Second)
|
||||
break
|
||||
}
|
||||
pending = append(pending, fetched...)
|
||||
}
|
||||
}
|
||||
// STOPFETCHONLY OMIT
|
||||
}
|
||||
|
||||
// loopSendOnly is a version of loop that includes only the logic for
|
||||
// sending items to s.updates.
|
||||
func (s *sub) loopSendOnly() {
|
||||
// STARTSENDONLY OMIT
|
||||
var pending []Item // appended by fetch; consumed by send
|
||||
for {
|
||||
var first Item
|
||||
var updates chan Item // HLupdates
|
||||
if len(pending) > 0 {
|
||||
first = pending[0]
|
||||
updates = s.updates // enable send case // HLupdates
|
||||
}
|
||||
|
||||
select {
|
||||
case updates <- first:
|
||||
pending = pending[1:]
|
||||
}
|
||||
}
|
||||
// STOPSENDONLY OMIT
|
||||
}
|
||||
|
||||
// mergedLoop is a version of loop that combines loopCloseOnly,
|
||||
// loopFetchOnly, and loopSendOnly.
|
||||
func (s *sub) mergedLoop() {
|
||||
// STARTFETCHVARS OMIT
|
||||
var pending []Item
|
||||
var next time.Time
|
||||
var err error
|
||||
// STOPFETCHVARS OMIT
|
||||
for {
|
||||
// STARTNOCAP OMIT
|
||||
var fetchDelay time.Duration
|
||||
if now := time.Now(); next.After(now) {
|
||||
fetchDelay = next.Sub(now)
|
||||
}
|
||||
startFetch := time.After(fetchDelay)
|
||||
// STOPNOCAP OMIT
|
||||
var first Item
|
||||
var updates chan Item
|
||||
if len(pending) > 0 {
|
||||
first = pending[0]
|
||||
updates = s.updates // enable send case
|
||||
}
|
||||
|
||||
// STARTSELECT OMIT
|
||||
select {
|
||||
case errc := <-s.closing: // HLcases
|
||||
errc <- err
|
||||
close(s.updates)
|
||||
return
|
||||
// STARTFETCHCASE OMIT
|
||||
case <-startFetch: // HLcases
|
||||
var fetched []Item
|
||||
fetched, next, err = s.fetcher.Fetch() // HLfetch
|
||||
if err != nil {
|
||||
next = time.Now().Add(10 * time.Second)
|
||||
break
|
||||
}
|
||||
pending = append(pending, fetched...) // HLfetch
|
||||
// STOPFETCHCASE OMIT
|
||||
case updates <- first: // HLcases
|
||||
pending = pending[1:]
|
||||
}
|
||||
// STOPSELECT OMIT
|
||||
}
|
||||
}
|
||||
|
||||
// dedupeLoop extends mergedLoop with deduping of fetched items.
|
||||
func (s *sub) dedupeLoop() {
|
||||
const maxPending = 10
|
||||
// STARTSEEN OMIT
|
||||
var pending []Item
|
||||
var next time.Time
|
||||
var err error
|
||||
var seen = make(map[string]bool) // set of item.GUIDs // HLseen
|
||||
// STOPSEEN OMIT
|
||||
for {
|
||||
// STARTCAP OMIT
|
||||
var fetchDelay time.Duration
|
||||
if now := time.Now(); next.After(now) {
|
||||
fetchDelay = next.Sub(now)
|
||||
}
|
||||
var startFetch <-chan time.Time // HLcap
|
||||
if len(pending) < maxPending { // HLcap
|
||||
startFetch = time.After(fetchDelay) // enable fetch case // HLcap
|
||||
} // HLcap
|
||||
// STOPCAP OMIT
|
||||
var first Item
|
||||
var updates chan Item
|
||||
if len(pending) > 0 {
|
||||
first = pending[0]
|
||||
updates = s.updates // enable send case
|
||||
}
|
||||
select {
|
||||
case errc := <-s.closing:
|
||||
errc <- err
|
||||
close(s.updates)
|
||||
return
|
||||
// STARTDEDUPE OMIT
|
||||
case <-startFetch:
|
||||
var fetched []Item
|
||||
fetched, next, err = s.fetcher.Fetch() // HLfetch
|
||||
if err != nil {
|
||||
next = time.Now().Add(10 * time.Second)
|
||||
break
|
||||
}
|
||||
for _, item := range fetched {
|
||||
if !seen[item.GUID] { // HLdupe
|
||||
pending = append(pending, item) // HLdupe
|
||||
seen[item.GUID] = true // HLdupe
|
||||
} // HLdupe
|
||||
}
|
||||
// STOPDEDUPE OMIT
|
||||
case updates <- first:
|
||||
pending = pending[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// loop periodically fecthes Items, sends them on s.updates, and exits
|
||||
// when Close is called. It extends dedupeLoop with logic to run
|
||||
// Fetch asynchronously.
|
||||
func (s *sub) loop() {
|
||||
const maxPending = 10
|
||||
type fetchResult struct {
|
||||
fetched []Item
|
||||
next time.Time
|
||||
err error
|
||||
}
|
||||
// STARTFETCHDONE OMIT
|
||||
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
|
||||
// STOPFETCHDONE OMIT
|
||||
var pending []Item
|
||||
var next time.Time
|
||||
var err error
|
||||
var seen = make(map[string]bool)
|
||||
for {
|
||||
var fetchDelay time.Duration
|
||||
if now := time.Now(); next.After(now) {
|
||||
fetchDelay = next.Sub(now)
|
||||
}
|
||||
// STARTFETCHIF OMIT
|
||||
var startFetch <-chan time.Time
|
||||
if fetchDone == nil && len(pending) < maxPending { // HLfetch
|
||||
startFetch = time.After(fetchDelay) // enable fetch case
|
||||
}
|
||||
// STOPFETCHIF OMIT
|
||||
var first Item
|
||||
var updates chan Item
|
||||
if len(pending) > 0 {
|
||||
first = pending[0]
|
||||
updates = s.updates // enable send case
|
||||
}
|
||||
// STARTFETCHASYNC OMIT
|
||||
select {
|
||||
case <-startFetch: // HLfetch
|
||||
fetchDone = make(chan fetchResult, 1) // HLfetch
|
||||
go func() {
|
||||
fetched, next, err := s.fetcher.Fetch()
|
||||
fetchDone <- fetchResult{fetched, next, err}
|
||||
}()
|
||||
case result := <-fetchDone: // HLfetch
|
||||
fetchDone = nil // HLfetch
|
||||
// Use result.fetched, result.next, result.err
|
||||
// STOPFETCHASYNC OMIT
|
||||
fetched := result.fetched
|
||||
next, err = result.next, result.err
|
||||
if err != nil {
|
||||
next = time.Now().Add(10 * time.Second)
|
||||
break
|
||||
}
|
||||
for _, item := range fetched {
|
||||
if id := item.GUID; !seen[id] { // HLdupe
|
||||
pending = append(pending, item)
|
||||
seen[id] = true // HLdupe
|
||||
}
|
||||
}
|
||||
case errc := <-s.closing:
|
||||
errc <- err
|
||||
close(s.updates)
|
||||
return
|
||||
case updates <- first:
|
||||
pending = pending[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// naiveMerge is a version of Merge that doesn't quite work right. In
|
||||
// particular, the goroutines it starts may block forever on m.updates
|
||||
// if the receiver stops receiving.
|
||||
type naiveMerge struct {
|
||||
subs []Subscription
|
||||
updates chan Item
|
||||
}
|
||||
|
||||
// STARTNAIVEMERGE OMIT
|
||||
func NaiveMerge(subs ...Subscription) Subscription {
|
||||
m := &naiveMerge{
|
||||
subs: subs,
|
||||
updates: make(chan Item),
|
||||
}
|
||||
// STARTNAIVEMERGELOOP OMIT
|
||||
for _, sub := range subs {
|
||||
go func(s Subscription) {
|
||||
for it := range s.Updates() {
|
||||
m.updates <- it // HL
|
||||
}
|
||||
}(sub)
|
||||
}
|
||||
// STOPNAIVEMERGELOOP OMIT
|
||||
return m
|
||||
}
|
||||
|
||||
// STOPNAIVEMERGE OMIT
|
||||
|
||||
// STARTNAIVEMERGECLOSE OMIT
|
||||
func (m *naiveMerge) Close() (err error) {
|
||||
for _, sub := range m.subs {
|
||||
if e := sub.Close(); err == nil && e != nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
close(m.updates) // HL
|
||||
return
|
||||
}
|
||||
|
||||
// STOPNAIVEMERGECLOSE OMIT
|
||||
|
||||
func (m *naiveMerge) Updates() <-chan Item {
|
||||
return m.updates
|
||||
}
|
||||
|
||||
type merge struct {
|
||||
subs []Subscription
|
||||
updates chan Item
|
||||
quit chan struct{}
|
||||
errs chan error
|
||||
}
|
||||
|
||||
// STARTMERGESIG OMIT
|
||||
// Merge returns a Subscription that merges the item streams from subs.
|
||||
// Closing the merged subscription closes subs.
|
||||
func Merge(subs ...Subscription) Subscription {
|
||||
// STOPMERGESIG OMIT
|
||||
m := &merge{
|
||||
subs: subs,
|
||||
updates: make(chan Item),
|
||||
quit: make(chan struct{}),
|
||||
errs: make(chan error),
|
||||
}
|
||||
// STARTMERGE OMIT
|
||||
for _, sub := range subs {
|
||||
go func(s Subscription) {
|
||||
for {
|
||||
var it Item
|
||||
select {
|
||||
case it = <-s.Updates():
|
||||
case <-m.quit: // HL
|
||||
m.errs <- s.Close() // HL
|
||||
return // HL
|
||||
}
|
||||
select {
|
||||
case m.updates <- it:
|
||||
case <-m.quit: // HL
|
||||
m.errs <- s.Close() // HL
|
||||
return // HL
|
||||
}
|
||||
}
|
||||
}(sub)
|
||||
}
|
||||
// STOPMERGE OMIT
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *merge) Updates() <-chan Item {
|
||||
return m.updates
|
||||
}
|
||||
|
||||
// STARTMERGECLOSE OMIT
|
||||
func (m *merge) Close() (err error) {
|
||||
close(m.quit) // HL
|
||||
for _ = range m.subs {
|
||||
if e := <-m.errs; e != nil { // HL
|
||||
err = e
|
||||
}
|
||||
}
|
||||
close(m.updates) // HL
|
||||
return
|
||||
}
|
||||
|
||||
// STOPMERGECLOSE OMIT
|
||||
|
||||
// NaiveDedupe converts a stream of Items that may contain duplicates
|
||||
// into one that doesn't.
|
||||
func NaiveDedupe(in <-chan Item) <-chan Item {
|
||||
out := make(chan Item)
|
||||
go func() {
|
||||
seen := make(map[string]bool)
|
||||
for it := range in {
|
||||
if !seen[it.GUID] {
|
||||
// BUG: this send blocks if the
|
||||
// receiver closes the Subscription
|
||||
// and stops receiving.
|
||||
out <- it // HL
|
||||
seen[it.GUID] = true
|
||||
}
|
||||
}
|
||||
close(out)
|
||||
}()
|
||||
return out
|
||||
}
|
||||
|
||||
type deduper struct {
|
||||
s Subscription
|
||||
updates chan Item
|
||||
closing chan chan error
|
||||
}
|
||||
|
||||
// Dedupe converts a Subscription that may send duplicate Items into
|
||||
// one that doesn't.
|
||||
func Dedupe(s Subscription) Subscription {
|
||||
d := &deduper{
|
||||
s: s,
|
||||
updates: make(chan Item),
|
||||
closing: make(chan chan error),
|
||||
}
|
||||
go d.loop()
|
||||
return d
|
||||
}
|
||||
|
||||
func (d *deduper) loop() {
|
||||
in := d.s.Updates() // enable receive
|
||||
var pending Item
|
||||
var out chan Item // disable send
|
||||
seen := make(map[string]bool)
|
||||
for {
|
||||
select {
|
||||
case it := <-in:
|
||||
if !seen[it.GUID] {
|
||||
pending = it
|
||||
in = nil // disable receive
|
||||
out = d.updates // enable send
|
||||
seen[it.GUID] = true
|
||||
}
|
||||
case out <- pending:
|
||||
in = d.s.Updates() // enable receive
|
||||
out = nil // disable send
|
||||
case errc := <-d.closing:
|
||||
err := d.s.Close()
|
||||
errc <- err
|
||||
close(d.updates)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *deduper) Close() error {
|
||||
errc := make(chan error)
|
||||
d.closing <- errc
|
||||
return <-errc
|
||||
}
|
||||
|
||||
func (d *deduper) Updates() <-chan Item {
|
||||
return d.updates
|
||||
}
|
||||
|
||||
// FakeFetch causes Fetch to use a fake fetcher instead of the real
|
||||
// one.
|
||||
var FakeFetch bool
|
||||
|
||||
// Fetch returns a Fetcher for Items from domain.
|
||||
func Fetch(domain string) Fetcher {
|
||||
if FakeFetch {
|
||||
return fakeFetch(domain)
|
||||
}
|
||||
return realFetch(domain)
|
||||
}
|
||||
|
||||
func fakeFetch(domain string) Fetcher {
|
||||
return &fakeFetcher{channel: domain}
|
||||
}
|
||||
|
||||
type fakeFetcher struct {
|
||||
channel string
|
||||
items []Item
|
||||
}
|
||||
|
||||
// FakeDuplicates causes the fake fetcher to return duplicate items.
|
||||
var FakeDuplicates bool
|
||||
|
||||
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
|
||||
now := time.Now()
|
||||
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
|
||||
item := Item{
|
||||
Channel: f.channel,
|
||||
Title: fmt.Sprintf("Item %d", len(f.items)),
|
||||
}
|
||||
item.GUID = item.Channel + "/" + item.Title
|
||||
f.items = append(f.items, item)
|
||||
if FakeDuplicates {
|
||||
items = f.items
|
||||
} else {
|
||||
items = []Item{item}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// realFetch returns a fetcher for the specified blogger domain.
|
||||
func realFetch(domain string) Fetcher {
|
||||
return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain))
|
||||
}
|
||||
|
||||
type fetcher struct {
|
||||
uri string
|
||||
feed *rss.Feed
|
||||
items []Item
|
||||
}
|
||||
|
||||
// NewFetcher returns a Fetcher for uri.
|
||||
func NewFetcher(uri string) Fetcher {
|
||||
f := &fetcher{
|
||||
uri: uri,
|
||||
}
|
||||
newChans := func(feed *rss.Feed, chans []*rss.Channel) {}
|
||||
newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) {
|
||||
for _, it := range items {
|
||||
f.items = append(f.items, Item{
|
||||
Channel: ch.Title,
|
||||
GUID: it.Guid,
|
||||
Title: it.Title,
|
||||
})
|
||||
}
|
||||
}
|
||||
f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems)
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *fetcher) Fetch() (items []Item, next time.Time, err error) {
|
||||
fmt.Println("fetching", f.uri)
|
||||
if err = f.feed.Fetch(f.uri, nil); err != nil {
|
||||
return
|
||||
}
|
||||
items = f.items
|
||||
f.items = nil
|
||||
next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: in a longer talk: move the Subscribe function onto a Reader type, to
|
||||
// support dynamically adding and removing Subscriptions. Reader should dedupe.
|
||||
|
||||
// TODO: in a longer talk: make successive Subscribe calls for the same uri
|
||||
// share the same underlying Subscription, but provide duplicate streams.
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// STARTMAIN OMIT
|
||||
func main() {
|
||||
// STARTMERGECALL OMIT
|
||||
// Subscribe to some feeds, and create a merged update stream.
|
||||
merged := Merge(
|
||||
Subscribe(Fetch("blog.golang.org")),
|
||||
Subscribe(Fetch("googleblog.blogspot.com")),
|
||||
Subscribe(Fetch("googledevelopers.blogspot.com")))
|
||||
// STOPMERGECALL OMIT
|
||||
|
||||
// Close the subscriptions after some time.
|
||||
time.AfterFunc(3*time.Second, func() {
|
||||
fmt.Println("closed:", merged.Close())
|
||||
})
|
||||
|
||||
// Print the stream.
|
||||
for it := range merged.Updates() {
|
||||
fmt.Println(it.Channel, it.Title)
|
||||
}
|
||||
|
||||
// Uncomment the panic below to dump the stack traces. This
|
||||
// will show several stacks for persistent HTTP connections
|
||||
// created by the real RSS client. To clean these up, we'll
|
||||
// need to extend Fetcher with a Close method and plumb this
|
||||
// through the RSS client implementation.
|
||||
//
|
||||
// panic("show me the stacks")
|
||||
}
|
||||
|
||||
// STOPMAIN OMIT
|
28
concurrency/waitgroup/main.go
Normal file
28
concurrency/waitgroup/main.go
Normal file
@ -0,0 +1,28 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
func main() {
|
||||
|
||||
for i := 0; i <= 10; i++ {
|
||||
go print(i)
|
||||
wg.Add(1)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func looprint(count int) {
|
||||
for i := 0; i <= count; i++ {
|
||||
fmt.Println(i)
|
||||
}
|
||||
}
|
||||
|
||||
func print(i int) {
|
||||
fmt.Println(i)
|
||||
wg.Done()
|
||||
}
|
Loading…
Reference in New Issue
Block a user