diff --git a/README.md b/README.md index c8357c1..eaeb1a0 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ ## 参考资料(Design patters Articles) -[go-patterns](https://github.com/nynicg/go-patterns) +[go-patterns](https://github.com/crazybber/go-patterns) [design-pattern-tutorial](https://www.runoob.com/design-pattern/design-pattern-tutorial.html) diff --git a/gomore/05_fan_out/fan_out.go b/gomore/05_fan_out/fan_out.go new file mode 100644 index 0000000..b748c0c --- /dev/null +++ b/gomore/05_fan_out/fan_out.go @@ -0,0 +1,131 @@ +package fanout + +import ( + "context" + "sync" + "sync/atomic" + + "go.uber.org/zap" +) + +var ( + log, _ = zap.NewDevelopment() +) + +const ( + MaxWorkers = 16 + MaxQueueSize = 512 + MasterQueueSize = MaxQueueSize * MaxWorkers +) + +type Pipeline struct { + workers map[int]*worker + chain chan interface{} +} + +func (p *Pipeline) Start(ctx context.Context) { + go func(pipe *Pipeline) { + for { + expectationWorkers := len(pipe.chain) % MaxWorkers + if expectationWorkers >= MaxWorkers { + expectationWorkers = 0 + } + select { + case <-ctx.Done(): + return + case val, ok := <-pipe.chain: + if !ok { + return + } + go pipe.workers[expectationWorkers].stream(val) + } + } + }(p) +} + +func (p *Pipeline) Dispatch(msg interface{}) { + p.chain <- msg +} + +type DispatcherBuilder func() Dispatcher + +func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { + ch := make(chan interface{}, MasterQueueSize) + wk := make(map[int]*worker) + for i := 0; i < MaxWorkers; i++ { + wk[i] = &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + } + } + return &Pipeline{workers: wk, chain: ch} +} + +type Dispatcher interface { + Before(context.Context) error + After() error + Process(interface{}) error +} + +type worker struct { + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher +} + +func (c *worker) stream(val interface{}) { + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + ctx, cancel := context.WithCancel(context.Background()) + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", zap.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", zap.Error(err)) + } + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + zap.Any("msg", &msg), + zap.Error(err), + ) + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + } + } + } + } +} diff --git a/gomore/05_fan_out/fan_out_test.go b/gomore/05_fan_out/fan_out_test.go new file mode 100644 index 0000000..d75c88d --- /dev/null +++ b/gomore/05_fan_out/fan_out_test.go @@ -0,0 +1,52 @@ +fanout + +import concurrency + +type taggingDispatcher struct { + Address string + stream proto.Havilah_StreamMetricClient + conn *grpc.ClientConn +} + +func (d *taggingDispatcher) Before(ctx context.Context) error { + conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) + if err != nil { + return err + } + d.conn = conn + client := proto.NewHavilahClient(conn) + + stream, err := client.StreamMetric(ctx) + if err != nil { + return err + } + d.stream = stream + return nil +} + +func (d *taggingDispatcher) After() error { + _, err := d.stream.CloseAndRecv() + + e := d.conn.Close() + if e != nil { + log.Error("close havilah connection error", field.Error(e)) + } + return err +} + +func (d *taggingDispatcher) Process(msg interface{}) error { + return d.stream.Send(msg.(*proto.Tagging)) +} + + +tagging := &Tagging{ + topic: topic, + pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher { + return &taggingDispatcher{Address: address} + }, ch, idle, debug), +} +tagging.pipeline.Start() + +func main(){ + tagging.pipeline.Dispatch(youStruct{}) +} \ No newline at end of file diff --git a/gomore/05_fan_out/fanout.md b/gomore/05_fan_out/fanout.md new file mode 100644 index 0000000..06541d2 --- /dev/null +++ b/gomore/05_fan_out/fanout.md @@ -0,0 +1,205 @@ +## Implementation +We can activate worker base on traffic of parent channel + +```go +package concurrency + +import ( + "context" + "io" + "sync" + "sync/atomic" + + "bitbucket.org/sakariai/sakari/log" + "bitbucket.org/sakariai/sakari/log/field" +) + +const ( + MaxWorkers = 32 + MaxQueueSize = 128 +) + +var ( + running uint32 = 0 +) + +type Pipeline struct { + workers []*worker + chain chan interface{} +} + +func (p *Pipeline) Start() { + distributeToChannels := func(ch chan interface{}, cs []*worker) { + writer := cs[0] //first worker must stream as default + for { + for _, c := range cs { + expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + select { + case val := <-ch: + runningWorker := atomic.LoadUint32(&running) + if c.index <= runningWorker || c.index <= expectationWorkers { + writer = c + } + if c.debug { + log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + } + go writer.stream(val) + } + } + } + } + + go distributeToChannels(p.chain, p.workers) +} + +func (p *Pipeline) Dispatch(msg interface{}) { + p.chain <- msg +} + +type DispatcherBuilder func() Dispatcher + +func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { + wk := make([]*worker, 0, MaxWorkers) + for i := 0; i < MaxWorkers; i++ { + wk = append(wk, + &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + }) + } + return &Pipeline{workers: wk, chain: ch} +} + +type Dispatcher interface { + Before(context.Context) error + After() error + Process(interface{}) error +} + +type worker struct { + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher +} + +func (c *worker) stream(val interface{}) { + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + atomic.AddUint32(&running, 1) + defer atomic.AddUint32(&running, ^uint32(1-1)) + ctx, cancel := context.WithCancel(context.Background()) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", field.Error(err)) + } + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", field.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + field.Any("msg", &msg), + field.Error(err), + ) + } + if err == io.EOF { + return + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + if c.debug { + log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + } + } + } + } + } +} + +``` + +## Usage + +```go +import concurrency + +type taggingDispatcher struct { + Address string + stream proto.Havilah_StreamMetricClient + conn *grpc.ClientConn +} + +func (d *taggingDispatcher) Before(ctx context.Context) error { + conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) + if err != nil { + return err + } + d.conn = conn + client := proto.NewHavilahClient(conn) + + stream, err := client.StreamMetric(ctx) + if err != nil { + return err + } + d.stream = stream + return nil +} + +func (d *taggingDispatcher) After() error { + _, err := d.stream.CloseAndRecv() + + e := d.conn.Close() + if e != nil { + log.Error("close havilah connection error", field.Error(e)) + } + return err +} + +func (d *taggingDispatcher) Process(msg interface{}) error { + return d.stream.Send(msg.(*proto.Tagging)) +} + + +tagging := &Tagging{ + topic: topic, + pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher { + return &taggingDispatcher{Address: address} + }, ch, idle, debug), +} +tagging.pipeline.Start() + +func main(){ + tagging.pipeline.Dispatch(youStruct{}) +} +```