From 80cb3ddd5e0d82935973996503701cfd188c3c10 Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 7 May 2020 12:20:04 +0800 Subject: [PATCH] remove useless codes --- README.md | 2 +- gomore/05_fan_out/fan_out_test.go | 51 ------ gomore/05_fan_out/fanout.md | 265 +++++++++++++++--------------- 3 files changed, 134 insertions(+), 184 deletions(-) diff --git a/README.md b/README.md index 73a3103..68f9469 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ + [x] [发布订阅模式(Pub-Sub)](./gomore/01_messages) + [x] [时差模式(Time Profile)](./gomore/02_profiles) + [x] [上下文模式(Context)](./gomore/03_context) -+ [ ] [WIP][淡入模式(Fan-In)](./gomore/04_fan_in) ++ [x] [淡入模式(Fan-In)](./gomore/04_fan_in) + [ ] [WIP][淡出模式(Fan-Out)](./gomore/05_fan_out) + [ ] [WIP][熔断模式(circuit breaker)](./gomore/06_circuit_breaker) + [x] [限流模式(rate limiting))](./gomore/07_rate_limiting) diff --git a/gomore/05_fan_out/fan_out_test.go b/gomore/05_fan_out/fan_out_test.go index 1d67a37..bc681e9 100644 --- a/gomore/05_fan_out/fan_out_test.go +++ b/gomore/05_fan_out/fan_out_test.go @@ -1,52 +1 @@ package fanout - -import "concurrency" - -type taggingDispatcher struct { - Address string - stream proto.Havilah_StreamMetricClient - conn *grpc.ClientConn -} - -func (d *taggingDispatcher) Before(ctx context.Context) error { - conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) - if err != nil { - return err - } - d.conn = conn - client := proto.NewHavilahClient(conn) - - stream, err := client.StreamMetric(ctx) - if err != nil { - return err - } - d.stream = stream - return nil -} - -func (d *taggingDispatcher) After() error { - _, err := d.stream.CloseAndRecv() - - e := d.conn.Close() - if e != nil { - log.Error("close havilah connection error", field.Error(e)) - } - return err -} - -func (d *taggingDispatcher) Process(msg interface{}) error { - return d.stream.Send(msg.(*proto.Tagging)) -} - - -tagging := &Tagging{ - topic: topic, - pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher { - return &taggingDispatcher{Address: address} - }, ch, idle, debug), -} -tagging.pipeline.Start() - -func main(){ - tagging.pipeline.Dispatch(youStruct{}) -} diff --git a/gomore/05_fan_out/fanout.md b/gomore/05_fan_out/fanout.md index 06541d2..04377b6 100644 --- a/gomore/05_fan_out/fanout.md +++ b/gomore/05_fan_out/fanout.md @@ -1,150 +1,151 @@ -## Implementation +# Implementation + We can activate worker base on traffic of parent channel ```go package concurrency import ( - "context" - "io" - "sync" - "sync/atomic" + "context" + "io" + "sync" + "sync/atomic" - "bitbucket.org/sakariai/sakari/log" - "bitbucket.org/sakariai/sakari/log/field" + "bitbucket.org/sakariai/sakari/log" + "bitbucket.org/sakariai/sakari/log/field" ) const ( - MaxWorkers = 32 - MaxQueueSize = 128 + MaxWorkers = 32 + MaxQueueSize = 128 ) var ( - running uint32 = 0 + running uint32 = 0 ) type Pipeline struct { - workers []*worker - chain chan interface{} + workers []*worker + chain chan interface{} } func (p *Pipeline) Start() { - distributeToChannels := func(ch chan interface{}, cs []*worker) { - writer := cs[0] //first worker must stream as default - for { - for _, c := range cs { - expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 - select { - case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } - if c.debug { - log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) - } - go writer.stream(val) - } - } - } - } + distributeToChannels := func(ch chan interface{}, cs []*worker) { + writer := cs[0] //first worker must stream as default + for { + for _, c := range cs { + expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + select { + case val := <-ch: + runningWorker := atomic.LoadUint32(&running) + if c.index <= runningWorker || c.index <= expectationWorkers { + writer = c + } + if c.debug { + log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + } + go writer.stream(val) + } + } + } + } - go distributeToChannels(p.chain, p.workers) + go distributeToChannels(p.chain, p.workers) } func (p *Pipeline) Dispatch(msg interface{}) { - p.chain <- msg + p.chain <- msg } type DispatcherBuilder func() Dispatcher func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { - wk := make([]*worker, 0, MaxWorkers) - for i := 0; i < MaxWorkers; i++ { - wk = append(wk, - &worker{ - index: uint32(i + 1), - chain: make(chan interface{}, MaxQueueSize), - mutex: new(sync.Mutex), - debug: debug, - idle: idle, - Dispatcher: d(), - }) - } - return &Pipeline{workers: wk, chain: ch} + wk := make([]*worker, 0, MaxWorkers) + for i := 0; i < MaxWorkers; i++ { + wk = append(wk, + &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + }) + } + return &Pipeline{workers: wk, chain: ch} } type Dispatcher interface { - Before(context.Context) error - After() error - Process(interface{}) error + Before(context.Context) error + After() error + Process(interface{}) error } type worker struct { - index uint32 - mutex *sync.Mutex - running bool - chain chan interface{} - debug bool - idle uint32 - Dispatcher + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher } func (c *worker) stream(val interface{}) { - c.chain <- val - if !c.running { - c.mutex.Lock() - c.running = true - atomic.AddUint32(&running, 1) - defer atomic.AddUint32(&running, ^uint32(1-1)) - ctx, cancel := context.WithCancel(context.Background()) - err := c.Before(ctx) + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + atomic.AddUint32(&running, 1) + defer atomic.AddUint32(&running, ^uint32(1-1)) + ctx, cancel := context.WithCancel(context.Background()) + err := c.Before(ctx) - if err != nil { - log.Error("can not start worker", field.Error(err)) - } - defer func(w *worker, cancel context.CancelFunc) { - if w.debug { - log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) - } - err := c.After() - if err != nil { - log.Error("can not finish track issue", field.Error(err)) - } - cancel() - w.mutex.Unlock() - w.running = false - }(c, cancel) - var idle uint32 = 0 - for { - select { - case msg := <-c.chain: - atomic.StoreUint32(&idle, 0) - if msg != nil { - err := c.Process(msg) - if err != nil { - log.Error("can not process message", - field.Any("msg", &msg), - field.Error(err), - ) - } - if err == io.EOF { - return - } - } - default: - atomic.AddUint32(&idle, 1) - if i := atomic.LoadUint32(&idle); i > 0 { - if i > c.idle { - return - } - if c.debug { - log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) - } - } - } - } - } + if err != nil { + log.Error("can not start worker", field.Error(err)) + } + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", field.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + field.Any("msg", &msg), + field.Error(err), + ) + } + if err == io.EOF { + return + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + if c.debug { + log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + } + } + } + } + } } ``` @@ -155,39 +156,39 @@ func (c *worker) stream(val interface{}) { import concurrency type taggingDispatcher struct { - Address string - stream proto.Havilah_StreamMetricClient - conn *grpc.ClientConn + Address string + stream proto.Havilah_StreamMetricClient + conn *grpc.ClientConn } func (d *taggingDispatcher) Before(ctx context.Context) error { - conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) - if err != nil { - return err - } - d.conn = conn - client := proto.NewHavilahClient(conn) + conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) + if err != nil { + return err + } + d.conn = conn + client := proto.NewHavilahClient(conn) - stream, err := client.StreamMetric(ctx) - if err != nil { - return err - } - d.stream = stream - return nil + stream, err := client.StreamMetric(ctx) + if err != nil { + return err + } + d.stream = stream + return nil } func (d *taggingDispatcher) After() error { - _, err := d.stream.CloseAndRecv() + _, err := d.stream.CloseAndRecv() - e := d.conn.Close() - if e != nil { - log.Error("close havilah connection error", field.Error(e)) - } - return err + e := d.conn.Close() + if e != nil { + log.Error("close havilah connection error", field.Error(e)) + } + return err } func (d *taggingDispatcher) Process(msg interface{}) error { - return d.stream.Send(msg.(*proto.Tagging)) + return d.stream.Send(msg.(*proto.Tagging)) } @@ -200,6 +201,6 @@ tagging := &Tagging{ tagging.pipeline.Start() func main(){ - tagging.pipeline.Dispatch(youStruct{}) + tagging.pipeline.Dispatch(youStruct{}) } ```