diff --git a/messaging/fanout.go b/messaging/fanout.go new file mode 100644 index 0000000..e73240b --- /dev/null +++ b/messaging/fanout.go @@ -0,0 +1,144 @@ +package messaging + +import ( + "context" + "io" + "sync" + "sync/atomic" + + "bitbucket.org/sakariai/sakari/log" + "bitbucket.org/sakariai/sakari/log/field" +) + +const ( + MaxWorkers = 32 + MaxQueueSize = 128 +) + +var ( + running uint32 = 0 +) + +type Pipeline struct { + workers []*worker + chain chan interface{} +} + +func (p *Pipeline) Start() { + distributeToChannels := func(ch chan interface{}, cs []*worker) { + writer := cs[0] //first worker must stream as default + for { + for _, c := range cs { + expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + select { + case val := <-ch: + runningWorker := atomic.LoadUint32(&running) + if c.index <= runningWorker || c.index <= expectationWorkers { + writer = c + } + if c.debug { + log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + } + go writer.stream(val) + } + } + } + } + + go distributeToChannels(p.chain, p.workers) +} + +func (p *Pipeline) Dispatch(msg interface{}) { + p.chain <- msg +} + +type DispatcherBuilder func() Dispatcher + +func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { + wk := make([]*worker, 0, MaxWorkers) + for i := 0; i < MaxWorkers; i++ { + wk = append(wk, + &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + }) + } + return &Pipeline{workers: wk, chain: ch} +} + +type Dispatcher interface { + Before(context.Context) error + After() error + Process(interface{}) error +} + +type worker struct { + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher +} + +func (c *worker) stream(val interface{}) { + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + atomic.AddUint32(&running, 1) + defer atomic.AddUint32(&running, ^uint32(1-1)) + ctx, cancel := context.WithCancel(context.Background()) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", field.Error(err)) + } + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", field.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + field.Any("msg", &msg), + field.Error(err), + ) + } + if err == io.EOF { + return + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + if c.debug { + log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + } + } + } + } + } +} diff --git a/messaging/fanout.md b/messaging/fanout.md new file mode 100644 index 0000000..06541d2 --- /dev/null +++ b/messaging/fanout.md @@ -0,0 +1,205 @@ +## Implementation +We can activate worker base on traffic of parent channel + +```go +package concurrency + +import ( + "context" + "io" + "sync" + "sync/atomic" + + "bitbucket.org/sakariai/sakari/log" + "bitbucket.org/sakariai/sakari/log/field" +) + +const ( + MaxWorkers = 32 + MaxQueueSize = 128 +) + +var ( + running uint32 = 0 +) + +type Pipeline struct { + workers []*worker + chain chan interface{} +} + +func (p *Pipeline) Start() { + distributeToChannels := func(ch chan interface{}, cs []*worker) { + writer := cs[0] //first worker must stream as default + for { + for _, c := range cs { + expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + select { + case val := <-ch: + runningWorker := atomic.LoadUint32(&running) + if c.index <= runningWorker || c.index <= expectationWorkers { + writer = c + } + if c.debug { + log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) + } + go writer.stream(val) + } + } + } + } + + go distributeToChannels(p.chain, p.workers) +} + +func (p *Pipeline) Dispatch(msg interface{}) { + p.chain <- msg +} + +type DispatcherBuilder func() Dispatcher + +func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { + wk := make([]*worker, 0, MaxWorkers) + for i := 0; i < MaxWorkers; i++ { + wk = append(wk, + &worker{ + index: uint32(i + 1), + chain: make(chan interface{}, MaxQueueSize), + mutex: new(sync.Mutex), + debug: debug, + idle: idle, + Dispatcher: d(), + }) + } + return &Pipeline{workers: wk, chain: ch} +} + +type Dispatcher interface { + Before(context.Context) error + After() error + Process(interface{}) error +} + +type worker struct { + index uint32 + mutex *sync.Mutex + running bool + chain chan interface{} + debug bool + idle uint32 + Dispatcher +} + +func (c *worker) stream(val interface{}) { + c.chain <- val + if !c.running { + c.mutex.Lock() + c.running = true + atomic.AddUint32(&running, 1) + defer atomic.AddUint32(&running, ^uint32(1-1)) + ctx, cancel := context.WithCancel(context.Background()) + err := c.Before(ctx) + + if err != nil { + log.Error("can not start worker", field.Error(err)) + } + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) + } + err := c.After() + if err != nil { + log.Error("can not finish track issue", field.Error(err)) + } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil { + err := c.Process(msg) + if err != nil { + log.Error("can not process message", + field.Any("msg", &msg), + field.Error(err), + ) + } + if err == io.EOF { + return + } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return + } + if c.debug { + log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) + } + } + } + } + } +} + +``` + +## Usage + +```go +import concurrency + +type taggingDispatcher struct { + Address string + stream proto.Havilah_StreamMetricClient + conn *grpc.ClientConn +} + +func (d *taggingDispatcher) Before(ctx context.Context) error { + conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) + if err != nil { + return err + } + d.conn = conn + client := proto.NewHavilahClient(conn) + + stream, err := client.StreamMetric(ctx) + if err != nil { + return err + } + d.stream = stream + return nil +} + +func (d *taggingDispatcher) After() error { + _, err := d.stream.CloseAndRecv() + + e := d.conn.Close() + if e != nil { + log.Error("close havilah connection error", field.Error(e)) + } + return err +} + +func (d *taggingDispatcher) Process(msg interface{}) error { + return d.stream.Send(msg.(*proto.Tagging)) +} + + +tagging := &Tagging{ + topic: topic, + pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher { + return &taggingDispatcher{Address: address} + }, ch, idle, debug), +} +tagging.pipeline.Start() + +func main(){ + tagging.pipeline.Dispatch(youStruct{}) +} +```