barrier pattern

This commit is contained in:
nynicg 2019-05-10 14:57:25 +08:00
parent 8bb1b4d337
commit 9968190840

View File

@ -6,40 +6,69 @@ import (
) )
type IBarrier interface { type IBarrier interface {
// error for timeout if need
Await() error Await() error
} }
type Barrier struct {
cond *sync.Cond // once
gos uint type ChanBarrier struct {
curgos uint gos int
curGos int
ch chan struct{}
m sync.Mutex
} }
func NewBarrier(syncGos uint)*Barrier{ func NewChanBarrier(gos int)*ChanBarrier{
if syncGos < 1{ return &ChanBarrier{
panic("min 1") gos:gos,
} ch:make(chan struct{}),
l := &sync.Mutex{}
c := sync.NewCond(l)
return &Barrier{
cond:c,
gos:syncGos,
} }
} }
func (b *Barrier)Await() error{ func (c *ChanBarrier) Await() error {
b.cond.L.Lock() c.m.Lock()
defer b.cond.L.Unlock() if c.curGos++;c.gos != c.curGos{
b.curgos++ c.m.Unlock()
if b.gos != b.curgos{ <- c.ch
b.cond.Wait()
}else{ }else{
b.curgos = 0 c.m.Unlock()
b.cond.Broadcast() close(c.ch)
} }
return nil return nil
} }
//type Barrier struct {
// cond *sync.Cond
// gos uint
// curgos uint
//}
//
//func NewBarrier(syncGos uint)*Barrier{
// if syncGos < 1{
// panic("min 1")
// }
// l := &sync.Mutex{}
// c := sync.NewCond(l)
// return &Barrier{
// cond:c,
// gos:syncGos,
// }
//}
//
//func (b *Barrier)Await() error{
// b.cond.L.Lock()
// defer b.cond.L.Unlock()
// b.curgos++
// if b.gos != b.curgos{
// b.cond.Wait()
// }else{
// b.curgos = 0
// b.cond.Broadcast()
// }
// return nil
//}
func main(){ func main(){
@ -47,7 +76,8 @@ func main(){
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
gos := 10 gos := 10
wg.Add(gos) wg.Add(gos)
b = NewBarrier(uint(gos)) //b = NewBarrier(uint(gos))
b = NewChanBarrier(gos)
for i:=0;i<gos ;i++ { for i:=0;i<gos ;i++ {
go func() { go func() {
log.Println("await") log.Println("await")
@ -59,72 +89,5 @@ func main(){
}() }()
} }
wg.Wait() wg.Wait()
//b = NewTimeoutBarrier(time.Second ,2)
//wg.Add(2)
//
//go func() {
// log.Println("await 1")
// if err := b.Await();err != nil{
// log.Println(err)
// }
// log.Println("pass 1")
// wg.Done()
//}()
//go func() {
// log.Println("await 2")
// // 导致前一个await超时
// time.Sleep(time.Second*4)
// if err := b.Await();err != nil{
// log.Println(err)
// }
// log.Println("pass 2")
// wg.Done()
//}()
//wg.Wait()
} }
//// only use once
//type TimeoutBarrier struct {
// ch chan struct{}
// t time.Duration
// gos uint
// arrgos uint
// togos int
// m sync.Mutex
//}
//
//func NewTimeoutBarrier(t time.Duration ,syncgos uint)*TimeoutBarrier{
// if t == 0{
// t = math.MaxInt64
// }
// if syncgos < 1{
// panic("min goroutine num = 1")
// }
// return &TimeoutBarrier{
// ch:make(chan struct{}),
// t:t,
// gos:syncgos,
// }
//}
//
//func (b *TimeoutBarrier)Await() (err error){
// b.m.Lock()
// b.arrgos++
// if b.arrgos != b.gos{
// b.m.Unlock()
// select {
// case <- b.ch:
// case <- time.After(b.t):
// b.togos++
// err = errors.New("time out")
// }
// }else{
// for i:=0;i<int(b.gos)-1-b.togos;i++{
// b.ch <- struct{}{}
// }
// close(b.ch)
// b.m.Unlock()
// }
// return err
//}