diff --git a/gomore/05_fan_out/fan_out.go b/gomore/05_fan_out/fan_out.go index 9df0742..4e89546 100644 --- a/gomore/05_fan_out/fan_out.go +++ b/gomore/05_fan_out/fan_out.go @@ -2,6 +2,47 @@ package fanout import "sync" +// Split2 多工作者重复分发,每个worker分发一波数据 +// Split2 a channel into n channels that receive messages in a round-robin fashion. +func Split2(ch <-chan int, n int) []chan int { + + cs := []chan int{} + for i := 0; i < n; i++ { + cs = append(cs, make(chan int)) + } + + var wg sync.WaitGroup + //Distributes one value to channels + distributeToChannels := func(ch <-chan int, cs []chan int) { + // // Close every channel when the execution ends. + + //get a target from ch + val, ok := <-ch + if !ok { + return // channel closed + } + //send value to all channels + for _, c := range cs { + c <- val + } + wg.Done() + } + + for i := 0; i < n; i++ { + wg.Add(1) + // a worker to distribute message + go distributeToChannels(ch, cs) + } + + go func() { + wg.Wait() + for _, c := range cs { + close(c) + } + }() + return cs +} + // Split 重复分发数据为多份 // Split a channel into n channels that receive messages in a round-robin fashion. func Split(ch <-chan int, n int) []chan int { @@ -46,51 +87,6 @@ func Split(ch <-chan int, n int) []chan int { return cs } -// Split2 多工作者重复分发 -// Split2 a channel into n channels that receive messages in a round-robin fashion. -func Split2(ch <-chan int, n int) []chan int { - - cs := make([]chan int, 0) - for i := 0; i < n; i++ { - cs = append(cs, make(chan int)) - } - - distributeToChannels := func(ch <-chan int, cs []chan int) { - // Close every channel when the execution ends. - defer func() { - for _, c := range cs { - close(c) - } - }() - var wg sync.WaitGroup - for { - //get a target from ch - select { - case val, ok := <-ch: - if !ok { - return - } - wg.Add(1) - go func(v int) { - defer wg.Done() - //send value to all channels - for _, c := range cs { - c <- val - } - - }(val) - - } - } - - wg.Wait() - } - - go distributeToChannels(ch, cs) - - return cs -} - //Split3 随机分发到不同的目的地 //Split3 a channel into n channels that receive messages in a round-robin fashion. func Split3(ch <-chan int, n int) []chan int { diff --git a/gomore/05_fan_out/fan_out_test.go b/gomore/05_fan_out/fan_out_test.go index a86785e..72a8f86 100644 --- a/gomore/05_fan_out/fan_out_test.go +++ b/gomore/05_fan_out/fan_out_test.go @@ -6,7 +6,41 @@ import ( "testing" ) -//重复分发 +//多工作者,重复分发 +func TestFanOutDuplicateMultiWorkers(t *testing.T) { + + //一路输入源 + dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} + //generator integer stream + inputChan := gen(dataStreams...) + + // transfer to + ch := sq(inputChan) + + // split it to 3 channel + // 重复分发 + outArray := Split2(ch, 3) + + length := len(outArray) + t.Log("length of out channel:", length) + var wg sync.WaitGroup + wg.Add(length) + for i := 0; i < length; i++ { + + go func(in <-chan int, index int) { + sum := 0 + for item := range in { + sum += item + } + fmt.Println("worker:", index, sum) + + wg.Done() + }(outArray[i], i) + } + wg.Wait() +} + +//单个工作者,重复分发 func TestFanOutDuplicate(t *testing.T) { //一路输入源 @@ -77,39 +111,6 @@ func TestFanOutRandom(t *testing.T) { wg.Wait() } -//多工作者,重复分发 -func TestFanOutDuplicateMultiWorkers(t *testing.T) { - - //一路输入源 - dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} - //generator integer stream - inputChan := gen(dataStreams...) - - // transfer to - ch := sq(inputChan) - - // split it to 3 channel - // 重复分发 - outArray := Split2(ch, 3) - - length := len(outArray) - t.Log("length of out channel:", length) - var wg sync.WaitGroup - wg.Add(length) - for i := 0; i < length; i++ { - - go func(in <-chan int, index int) { - sum := 0 - for item := range in { - sum += item - } - fmt.Println("worker:", index, sum) - - wg.Done() - }(outArray[i], i) - } - wg.Wait() -} func TestManualFanOutNumbersSeq(T *testing.T) { //一路输入源