From fe3617814c042f9af7394e6158445ade5b919f86 Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 7 May 2020 15:08:32 +0800 Subject: [PATCH] update code content for fan-out pattern --- gomore/05_fan_out/fan_out.go | 209 ++++++++++++++------------- gomore/05_fan_out/fan_out_in_test.go | 140 ------------------ gomore/05_fan_out/fan_out_test.go | 63 ++++++++ gomore/05_fan_out/fanout.md | 206 -------------------------- 4 files changed, 168 insertions(+), 450 deletions(-) delete mode 100644 gomore/05_fan_out/fan_out_in_test.go delete mode 100644 gomore/05_fan_out/fanout.md diff --git a/gomore/05_fan_out/fan_out.go b/gomore/05_fan_out/fan_out.go index b748c0c..f730160 100644 --- a/gomore/05_fan_out/fan_out.go +++ b/gomore/05_fan_out/fan_out.go @@ -1,131 +1,132 @@ package fanout -import ( - "context" - "sync" - "sync/atomic" +// Split 重复分发数据为多份 +// Split a channel into n channels that receive messages in a round-robin fashion. +func Split(ch <-chan int, n int) []chan int { - "go.uber.org/zap" -) + //get a queue of chan + cs := make([]chan int, n) -var ( - log, _ = zap.NewDevelopment() -) - -const ( - MaxWorkers = 16 - MaxQueueSize = 512 - MasterQueueSize = MaxQueueSize * MaxWorkers -) - -type Pipeline struct { - workers map[int]*worker - chain chan interface{} -} - -func (p *Pipeline) Start(ctx context.Context) { - go func(pipe *Pipeline) { - for { - expectationWorkers := len(pipe.chain) % MaxWorkers - if expectationWorkers >= MaxWorkers { - expectationWorkers = 0 + // Distributes the work in a round robin fashion among the stated number + // of channels until the main channel has been closed. In that case, close + // all channels and return. + distributeToChannels := func(ch <-chan int, cs []chan int) { + // Close every channel when the execution ends. + defer func(cs []chan int) { + for _, c := range cs { + close(c) } + }(cs) + + //this version will block + for { + //get a target from ch select { - case <-ctx.Done(): - return - case val, ok := <-pipe.chain: + case val, ok := <-ch: if !ok { return } - go pipe.workers[expectationWorkers].stream(val) + //send value to all channels + for _, c := range cs { + c <- val + } } } - }(p) -} - -func (p *Pipeline) Dispatch(msg interface{}) { - p.chain <- msg -} - -type DispatcherBuilder func() Dispatcher - -func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { - ch := make(chan interface{}, MasterQueueSize) - wk := make(map[int]*worker) - for i := 0; i < MaxWorkers; i++ { - wk[i] = &worker{ - index: uint32(i + 1), - chain: make(chan interface{}, MaxQueueSize), - mutex: new(sync.Mutex), - debug: debug, - idle: idle, - Dispatcher: d(), - } } - return &Pipeline{workers: wk, chain: ch} + + // a worker to distribute message + go distributeToChannels(ch, cs) + + return cs } -type Dispatcher interface { - Before(context.Context) error - After() error - Process(interface{}) error -} +// Split2 多工作者重复分发 +// Split2 a channel into n channels that receive messages in a round-robin fashion. +func Split2(ch <-chan int, n int) []<-chan int { -type worker struct { - index uint32 - mutex *sync.Mutex - running bool - chain chan interface{} - debug bool - idle uint32 - Dispatcher -} + cs := make([]chan int, n) -func (c *worker) stream(val interface{}) { - c.chain <- val - if !c.running { - c.mutex.Lock() - c.running = true - ctx, cancel := context.WithCancel(context.Background()) - defer func(w *worker, cancel context.CancelFunc) { - if w.debug { - log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) + //this version will block + for { + //get a target from ch + select { + case val, ok := <-ch: + if !ok { + return cs } - err := c.After() - if err != nil { - log.Error("can not finish track issue", zap.Error(err)) - } - cancel() - w.mutex.Unlock() - w.running = false - }(c, cancel) - err := c.Before(ctx) + //send value to all channels + for _, c := range cs { + go func(tmpV int) { + c <- tmpV + }(val) - if err != nil { - log.Error("can not start worker", zap.Error(err)) + } } - var idle uint32 = 0 + } + + return cs +} + +//Split3 随机分发到不同的目的地 +//Split3 a channel into n channels that receive messages in a round-robin fashion. +func Split3(ch <-chan int, n int) []<-chan int { + + cs := make([]chan int, n) + + // Distributes the work in a round robin fashion among the stated number + // of channels until the main channel has been closed. In that case, close + // all channels and return. + distributeToChannels := func(ch <-chan int, cs []chan<- int) { + // Close every channel when the execution ends. + defer func(cs []chan<- int) { + for _, c := range cs { + close(c) + } + }(cs) + for { - select { - case msg := <-c.chain: - atomic.StoreUint32(&idle, 0) - if msg != nil { - err := c.Process(msg) - if err != nil { - log.Error("can not process message", - zap.Any("msg", &msg), - zap.Error(err), - ) - } - } - default: - atomic.AddUint32(&idle, 1) - if i := atomic.LoadUint32(&idle); i > 0 { - if i > c.idle { + for _, c := range cs { + select { + case val, ok := <-ch: + if !ok { return } + + c <- val } } } } + + go distributeToChannels(ch, cs) + + return cs +} + +//The first stage, gen, is a function that converts a list of integers to a channel that emits the integers in the list. +// The gen function starts a goroutine that sends the integers on the channel and closes the channel when all +// the values have been sent: +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + defer close(out) + for _, n := range nums { + out <- n + } + }() + return out +} + +//The second stage, sq, receives integers from a channel and returns a channel that emits the square of +// each received integer. After the inbound channel is closed and this stage has sent all the values downstream, +// it closes the outbound channel: +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out } diff --git a/gomore/05_fan_out/fan_out_in_test.go b/gomore/05_fan_out/fan_out_in_test.go deleted file mode 100644 index d9bf824..0000000 --- a/gomore/05_fan_out/fan_out_in_test.go +++ /dev/null @@ -1,140 +0,0 @@ -package fanout - -import ( - "context" - "fmt" - "log" - "sync" - "testing" - "time" -) - -func TestFanInOut(T *testing.T) { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - var errList []chan error - - nChan, err, err := generateNumbers(ctx) - if err != nil { - log.Fatal(err) - } - errList = append(errList, err) - - fChannel, errList, err := fanout(ctx, nChan, squareNumbers) - errList = append(errList, errList...) - - res, errRes, err := fanin(fChannel, errList) - - go func() { - for { - select { - case r := <-res: - v, ok := r.(int) - if !ok { - // TODO - _ = ok - } - fmt.Println(v) - case e := <-errRes: - fmt.Println(e) - } - } - }() - - fmt.Println("finished") - time.Sleep(10 * time.Duration(time.Second)) -} - -type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error) - -func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) { - fmt.Println("generateNumbers called") - out := make(chan interface{}, 100) - err := make(chan error, 100) - - go func() { - defer close(out) - defer close(err) - - for i := 0; i < 10; i++ { - fmt.Println("gen: ", i) - out <- i - } - }() - - return out, err, nil -} - -func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) { - fmt.Println("fanout called") - var out []chan interface{} - var errList []chan error - - for i := 0; i < 5; i++ { - res, err, err := fct(ctx, in) - // Todo: manage error - _ = err - out = append(out, res) - errList = append(errList, err) - } - - return out, errList, nil -} - -func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) { - fmt.Println("squareNumbers called") - out := make(chan interface{}, 100) - err := make(chan error, 100) - - go func() { - for v := range in { - num, _ := v.(int) - squared := num * num - select { - case out <- squared: - case <-ctx.Done(): - fmt.Println("ctx done") - return - } - } - close(out) - }() - - return out, err, nil -} - -func fanin(ins []chan interface{}, errList []chan error) (chan interface{}, chan error, error) { - fmt.Println("fanin called") - out := make(chan interface{}, 100) - errout := make(chan error, 100) - var waitgroup sync.WaitGroup - - length := len(ins) + len(errList) - waitgroup.Add(length) - - for _, v := range ins { - go func(w chan interface{}) { - for val := range w { - out <- val - } - waitgroup.Done() - }(v) - } - - for _, e := range errList { - go func(err chan error) { - for v := range err { - errout <- v - } - waitgroup.Done() - }(e) - } - - go func() { - waitgroup.Wait() - close(out) - close(errout) - }() - - return out, errout, nil -} diff --git a/gomore/05_fan_out/fan_out_test.go b/gomore/05_fan_out/fan_out_test.go index bc681e9..e502d2f 100644 --- a/gomore/05_fan_out/fan_out_test.go +++ b/gomore/05_fan_out/fan_out_test.go @@ -1 +1,64 @@ package fanout + +import ( + "fmt" + "sync" + "testing" +) + +func TestMultiFanOutNumbersSeq(T *testing.T) { + + //一路输入源 + dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} + //generator integer stream + inputChan := gen(dataStreams...) + + // transfer to + ch := sq(inputChan) + + // split it to 3 channel + // 重复分发 + outArray := Split(ch, 3) + + var wg sync.WaitGroup + wg.Add(len(outArray)) + for i := 0; i < len(outArray); i++ { + + go func(in <-chan int, index int) { + sum := 0 + for item := range in { + sum += item + } + fmt.Println("worker:", index, sum) + + wg.Done() + }(outArray[i], i) + } + wg.Wait() +} + +func TestManualFanOutNumbersSeq(T *testing.T) { + + //一路输入源 + dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} + // generate the common channel with inputs + inputChan1 := gen(dataStreams...) + inputChan2 := gen(dataStreams...) + + //Manual Fan-out to 2 Go-routine + c1 := sq(inputChan1) + c2 := sq(inputChan2) + + fmt.Print("c1 queue: ") + for n := range c1 { + fmt.Print(n, " ") + } + fmt.Println() + + fmt.Print("c2 queue: ") + for n := range c2 { + fmt.Print(n, " ") + } + fmt.Println() + +} diff --git a/gomore/05_fan_out/fanout.md b/gomore/05_fan_out/fanout.md deleted file mode 100644 index 04377b6..0000000 --- a/gomore/05_fan_out/fanout.md +++ /dev/null @@ -1,206 +0,0 @@ -# Implementation - -We can activate worker base on traffic of parent channel - -```go -package concurrency - -import ( - "context" - "io" - "sync" - "sync/atomic" - - "bitbucket.org/sakariai/sakari/log" - "bitbucket.org/sakariai/sakari/log/field" -) - -const ( - MaxWorkers = 32 - MaxQueueSize = 128 -) - -var ( - running uint32 = 0 -) - -type Pipeline struct { - workers []*worker - chain chan interface{} -} - -func (p *Pipeline) Start() { - distributeToChannels := func(ch chan interface{}, cs []*worker) { - writer := cs[0] //first worker must stream as default - for { - for _, c := range cs { - expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 - select { - case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } - if c.debug { - log.Info("Worker receiving", field.Any("index", writer.index), field.Any("running", runningWorker), field.Any("no# workers", expectationWorkers)) - } - go writer.stream(val) - } - } - } - } - - go distributeToChannels(p.chain, p.workers) -} - -func (p *Pipeline) Dispatch(msg interface{}) { - p.chain <- msg -} - -type DispatcherBuilder func() Dispatcher - -func NewPipeline(d DispatcherBuilder, ch chan interface{}, idle uint32, debug bool) *Pipeline { - wk := make([]*worker, 0, MaxWorkers) - for i := 0; i < MaxWorkers; i++ { - wk = append(wk, - &worker{ - index: uint32(i + 1), - chain: make(chan interface{}, MaxQueueSize), - mutex: new(sync.Mutex), - debug: debug, - idle: idle, - Dispatcher: d(), - }) - } - return &Pipeline{workers: wk, chain: ch} -} - -type Dispatcher interface { - Before(context.Context) error - After() error - Process(interface{}) error -} - -type worker struct { - index uint32 - mutex *sync.Mutex - running bool - chain chan interface{} - debug bool - idle uint32 - Dispatcher -} - -func (c *worker) stream(val interface{}) { - c.chain <- val - if !c.running { - c.mutex.Lock() - c.running = true - atomic.AddUint32(&running, 1) - defer atomic.AddUint32(&running, ^uint32(1-1)) - ctx, cancel := context.WithCancel(context.Background()) - err := c.Before(ctx) - - if err != nil { - log.Error("can not start worker", field.Error(err)) - } - defer func(w *worker, cancel context.CancelFunc) { - if w.debug { - log.Info("Worker leaving", field.Any("index", w.index), field.Any("idle", w.idle)) - } - err := c.After() - if err != nil { - log.Error("can not finish track issue", field.Error(err)) - } - cancel() - w.mutex.Unlock() - w.running = false - }(c, cancel) - var idle uint32 = 0 - for { - select { - case msg := <-c.chain: - atomic.StoreUint32(&idle, 0) - if msg != nil { - err := c.Process(msg) - if err != nil { - log.Error("can not process message", - field.Any("msg", &msg), - field.Error(err), - ) - } - if err == io.EOF { - return - } - } - default: - atomic.AddUint32(&idle, 1) - if i := atomic.LoadUint32(&idle); i > 0 { - if i > c.idle { - return - } - if c.debug { - log.Info("Idle", field.Any("worker index", c.index), field.Any("idle", idle)) - } - } - } - } - } -} - -``` - -## Usage - -```go -import concurrency - -type taggingDispatcher struct { - Address string - stream proto.Havilah_StreamMetricClient - conn *grpc.ClientConn -} - -func (d *taggingDispatcher) Before(ctx context.Context) error { - conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) - if err != nil { - return err - } - d.conn = conn - client := proto.NewHavilahClient(conn) - - stream, err := client.StreamMetric(ctx) - if err != nil { - return err - } - d.stream = stream - return nil -} - -func (d *taggingDispatcher) After() error { - _, err := d.stream.CloseAndRecv() - - e := d.conn.Close() - if e != nil { - log.Error("close havilah connection error", field.Error(e)) - } - return err -} - -func (d *taggingDispatcher) Process(msg interface{}) error { - return d.stream.Send(msg.(*proto.Tagging)) -} - - -tagging := &Tagging{ - topic: topic, - pipeline: concurrency.NewPipeline(func() concurrency.Dispatcher { - return &taggingDispatcher{Address: address} - }, ch, idle, debug), -} -tagging.pipeline.Start() - -func main(){ - tagging.pipeline.Dispatch(youStruct{}) -} -```