From 53b2e22d7300bd3b8b4acd1e4287f6f92039e861 Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 7 May 2020 16:52:03 +0800 Subject: [PATCH] basic finish fanout pattern --- gomore/04_fan_in/fan_in_out_test.go | 2 +- gomore/04_fan_in/fan_in_test.go | 2 +- gomore/05_fan_out/fan_out.go | 78 ++++++++++++++++++---------- gomore/05_fan_out/fan_out_test.go | 79 +++++++++++++++++++++++++++-- 4 files changed, 130 insertions(+), 31 deletions(-) diff --git a/gomore/04_fan_in/fan_in_out_test.go b/gomore/04_fan_in/fan_in_out_test.go index c687d06..71dc935 100644 --- a/gomore/04_fan_in/fan_in_out_test.go +++ b/gomore/04_fan_in/fan_in_out_test.go @@ -5,7 +5,7 @@ import ( "testing" ) -func TestFanInOutNumbersSeq(T *testing.T) { +func TestFanInOutNumbersSeq(t *testing.T) { //一路输入源 dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} diff --git a/gomore/04_fan_in/fan_in_test.go b/gomore/04_fan_in/fan_in_test.go index a34da63..570b7d4 100644 --- a/gomore/04_fan_in/fan_in_test.go +++ b/gomore/04_fan_in/fan_in_test.go @@ -5,7 +5,7 @@ import ( "testing" ) -func TestFanInNumbersSeq(T *testing.T) { +func TestFanInNumbersSeq(t *testing.T) { //第一路输入源 dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} diff --git a/gomore/05_fan_out/fan_out.go b/gomore/05_fan_out/fan_out.go index f730160..9df0742 100644 --- a/gomore/05_fan_out/fan_out.go +++ b/gomore/05_fan_out/fan_out.go @@ -1,22 +1,28 @@ package fanout +import "sync" + // Split 重复分发数据为多份 // Split a channel into n channels that receive messages in a round-robin fashion. func Split(ch <-chan int, n int) []chan int { //get a queue of chan - cs := make([]chan int, n) + //cs := make([]chan int, n) //创建了个chan 数组但是空的 - // Distributes the work in a round robin fashion among the stated number - // of channels until the main channel has been closed. In that case, close - // all channels and return. + cs := []chan int{} + for i := 0; i < n; i++ { + cs = append(cs, make(chan int)) + } + + //Distributes the work in a round robin fashion among the stated number of channels + //until the main channel has been closed. In that case, close all channels and return. distributeToChannels := func(ch <-chan int, cs []chan int) { // Close every channel when the execution ends. - defer func(cs []chan int) { + defer func() { for _, c := range cs { close(c) } - }(cs) + }() //this version will block for { @@ -24,7 +30,7 @@ func Split(ch <-chan int, n int) []chan int { select { case val, ok := <-ch: if !ok { - return + return // channel closed } //send value to all channels for _, c := range cs { @@ -42,47 +48,68 @@ func Split(ch <-chan int, n int) []chan int { // Split2 多工作者重复分发 // Split2 a channel into n channels that receive messages in a round-robin fashion. -func Split2(ch <-chan int, n int) []<-chan int { +func Split2(ch <-chan int, n int) []chan int { - cs := make([]chan int, n) + cs := make([]chan int, 0) + for i := 0; i < n; i++ { + cs = append(cs, make(chan int)) + } - //this version will block - for { - //get a target from ch - select { - case val, ok := <-ch: - if !ok { - return cs - } - //send value to all channels + distributeToChannels := func(ch <-chan int, cs []chan int) { + // Close every channel when the execution ends. + defer func() { for _, c := range cs { - go func(tmpV int) { - c <- tmpV + close(c) + } + }() + var wg sync.WaitGroup + for { + //get a target from ch + select { + case val, ok := <-ch: + if !ok { + return + } + wg.Add(1) + go func(v int) { + defer wg.Done() + //send value to all channels + for _, c := range cs { + c <- val + } + }(val) } } + + wg.Wait() } + go distributeToChannels(ch, cs) + return cs } //Split3 随机分发到不同的目的地 //Split3 a channel into n channels that receive messages in a round-robin fashion. -func Split3(ch <-chan int, n int) []<-chan int { +func Split3(ch <-chan int, n int) []chan int { - cs := make([]chan int, n) + cs := make([]chan int, 0) + for i := 0; i < n; i++ { + cs = append(cs, make(chan int)) + } // Distributes the work in a round robin fashion among the stated number // of channels until the main channel has been closed. In that case, close // all channels and return. - distributeToChannels := func(ch <-chan int, cs []chan<- int) { + distributeToChannels := func(ch <-chan int, cs []chan int) { // Close every channel when the execution ends. - defer func(cs []chan<- int) { + defer func() { for _, c := range cs { close(c) } - }(cs) + }() for { for _, c := range cs { @@ -91,7 +118,6 @@ func Split3(ch <-chan int, n int) []<-chan int { if !ok { return } - c <- val } } diff --git a/gomore/05_fan_out/fan_out_test.go b/gomore/05_fan_out/fan_out_test.go index e502d2f..a86785e 100644 --- a/gomore/05_fan_out/fan_out_test.go +++ b/gomore/05_fan_out/fan_out_test.go @@ -6,7 +6,8 @@ import ( "testing" ) -func TestMultiFanOutNumbersSeq(T *testing.T) { +//重复分发 +func TestFanOutDuplicate(t *testing.T) { //一路输入源 dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} @@ -20,9 +21,11 @@ func TestMultiFanOutNumbersSeq(T *testing.T) { // 重复分发 outArray := Split(ch, 3) + length := len(outArray) + t.Log("length of out channel:", length) var wg sync.WaitGroup - wg.Add(len(outArray)) - for i := 0; i < len(outArray); i++ { + wg.Add(length) + for i := 0; i < length; i++ { go func(in <-chan int, index int) { sum := 0 @@ -37,6 +40,76 @@ func TestMultiFanOutNumbersSeq(T *testing.T) { wg.Wait() } +//随机分发 +// worker: 2 11245 +// worker: 0 14988 +// worker: 1 10117 +func TestFanOutRandom(t *testing.T) { + + //一路输入源 + dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} + //generator integer stream + inputChan := gen(dataStreams...) + + // transfer to + ch := sq(inputChan) + + // split it to 3 channel + // 重复分发 + outArray := Split3(ch, 3) + + length := len(outArray) + t.Log("length of out channel:", length) + var wg sync.WaitGroup + wg.Add(length) + for i := 0; i < length; i++ { + + go func(in <-chan int, index int) { + sum := 0 + for item := range in { + sum += item + } + fmt.Println("worker:", index, sum) + + wg.Done() + }(outArray[i], i) + } + wg.Wait() +} + +//多工作者,重复分发 +func TestFanOutDuplicateMultiWorkers(t *testing.T) { + + //一路输入源 + dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} + //generator integer stream + inputChan := gen(dataStreams...) + + // transfer to + ch := sq(inputChan) + + // split it to 3 channel + // 重复分发 + outArray := Split2(ch, 3) + + length := len(outArray) + t.Log("length of out channel:", length) + var wg sync.WaitGroup + wg.Add(length) + for i := 0; i < length; i++ { + + go func(in <-chan int, index int) { + sum := 0 + for item := range in { + sum += item + } + fmt.Println("worker:", index, sum) + + wg.Done() + }(outArray[i], i) + } + wg.Wait() +} func TestManualFanOutNumbersSeq(T *testing.T) { //一路输入源