diff --git a/messaging/fanout.go b/messaging/fanout.go index e73240b..e52785b 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -2,12 +2,10 @@ package messaging import ( "context" + "go.uber.org/zap" "io" "sync" "sync/atomic" - - "bitbucket.org/sakariai/sakari/log" - "bitbucket.org/sakariai/sakari/log/field" ) const ( @@ -17,6 +15,7 @@ const ( var ( running uint32 = 0 + log, _ = zap.NewDevelopment() ) type Pipeline struct { @@ -37,7 +36,7 @@ func (p *Pipeline) Start() { writer = c } if c.debug { - log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) } go writer.stream(val) } @@ -54,8 +53,9 @@ func (p *Pipeline) Dispatch(msg interface{}) { type DispatcherBuilder func() Dispatcher -func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { +func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { wk := make([]*worker, 0, MaxWorkers) + ch := make(chan interface{}, 4096) for i := 0; i < MaxWorkers; i++ { wk = append(wk, &worker{ @@ -97,15 +97,15 @@ func (c *worker) stream(val interface{}) { err := c.Before(ctx) if err != nil { - log.Error("can not start worker", field.Error(err)) + log.Error("can not start worker", zap.Error(err)) } defer func(w *worker, cancel context.CancelFunc) { if w.debug { - log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) } err := c.After() if err != nil { - log.Error("can not finish track issue", field.Error(err)) + log.Error("can not finish track issue", zap.Error(err)) } cancel() w.mutex.Unlock() @@ -120,8 +120,8 @@ func (c *worker) stream(val interface{}) { err := c.Process(msg) if err != nil { log.Error("can not process message", - field.Any("msg", &msg), - field.Error(err), + zap.Any("msg", &msg), + zap.Error(err), ) } if err == io.EOF { @@ -135,7 +135,7 @@ func (c *worker) stream(val interface{}) { return } if c.debug { - log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + log.Info("Idle", zap.Any("worker index", c.index), zap.Any("idle", idle)) } } } diff --git a/messaging/fanout.md b/messaging/fanout.md index 06541d2..c561e81 100644 --- a/messaging/fanout.md +++ b/messaging/fanout.md @@ -1,153 +1,9 @@ +Fan-Out Design Patterns +=================================== ## Implementation -We can activate worker base on traffic of parent channel - -```go -package concurrency - -import ( - "context" - "io" - "sync" - "sync/atomic" - - "bitbucket.org/sakariai/sakari/log" - "bitbucket.org/sakariai/sakari/log/field" -) - -const ( - MaxWorkers = 32 - MaxQueueSize = 128 -) - -var ( - running uint32 = 0 -) - -type Pipeline struct { - workers []*worker - chain chan interface{} -} - -func (p *Pipeline) Start() { - distributeToChannels := func(ch chan interface{}, cs []*worker) { - writer := cs[0] //first worker must stream as default - for { - for _, c := range cs { - expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 - select { - case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } - if c.debug { - log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) - } - go writer.stream(val) - } - } - } - } - - go distributeToChannels(p.chain, p.workers) -} - -func (p *Pipeline) Dispatch(msg interface{}) { - p.chain <- msg -} - -type DispatcherBuilder func() Dispatcher - -func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { - wk := make([]*worker, 0, MaxWorkers) - for i := 0; i < MaxWorkers; i++ { - wk = append(wk, - &worker{ - index: uint32(i + 1), - chain: make(chan interface{}, MaxQueueSize), - mutex: new(sync.Mutex), - debug: debug, - idle: idle, - Dispatcher: d(), - }) - } - return &Pipeline{workers: wk, chain: ch} -} - -type Dispatcher interface { - Before(context.Context) error - After() error - Process(interface{}) error -} - -type worker struct { - index uint32 - mutex *sync.Mutex - running bool - chain chan interface{} - debug bool - idle uint32 - Dispatcher -} - -func (c *worker) stream(val interface{}) { - c.chain <- val - if !c.running { - c.mutex.Lock() - c.running = true - atomic.AddUint32(&running, 1) - defer atomic.AddUint32(&running, ^uint32(1-1)) - ctx, cancel := context.WithCancel(context.Background()) - err := c.Before(ctx) - - if err != nil { - log.Error("can not start worker", field.Error(err)) - } - defer func(w *worker, cancel context.CancelFunc) { - if w.debug { - log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) - } - err := c.After() - if err != nil { - log.Error("can not finish track issue", field.Error(err)) - } - cancel() - w.mutex.Unlock() - w.running = false - }(c, cancel) - var idle uint32 = 0 - for { - select { - case msg := <-c.chain: - atomic.StoreUint32(&idle, 0) - if msg != nil { - err := c.Process(msg) - if err != nil { - log.Error("can not process message", - field.Any("msg", &msg), - field.Error(err), - ) - } - if err == io.EOF { - return - } - } - default: - atomic.AddUint32(&idle, 1) - if i := atomic.LoadUint32(&idle); i > 0 { - if i > c.idle { - return - } - if c.debug { - log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) - } - } - } - } - } -} - -``` +We can activate worker based on traffic of parent channel +`NewPipeline(d DispatcherBuilder, idle uint32, debug bool)` +* Set `idle` around 1000-2000 for deactivate worker in select block ## Usage