From 62bdf96dcda220e5b741320f56077ea671496135 Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 7 May 2020 11:55:36 +0800 Subject: [PATCH] finish fan_in pattern --- gomore/04_fan_in/fan_in.go | 53 +++++++++++++++++++++++---------- gomore/04_fan_in/fan_in.md | 41 ------------------------- gomore/04_fan_in/fan_in_test.go | 32 ++++++++++++-------- 3 files changed, 57 insertions(+), 69 deletions(-) delete mode 100644 gomore/04_fan_in/fan_in.md diff --git a/gomore/04_fan_in/fan_in.go b/gomore/04_fan_in/fan_in.go index 84dcfcc..e39f331 100644 --- a/gomore/04_fan_in/fan_in.go +++ b/gomore/04_fan_in/fan_in.go @@ -1,11 +1,46 @@ package fanin -func generatePipeline(numbers []int) <-chan int { +import "sync" + +// Merge operate a FanIn to compose different channels into one +func Merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + + out := make(chan int, 3) + + wg.Add(len(cs)) + + // Start an send goroutine for each input channel in cs. send + // copies values from c to out until c is closed, then calls wg.Done. + send := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + + //启动多个 go routine 开始工作 + for _, c := range cs { + go send(c) + } + // Start a goroutine to close out once all the send goroutines are + // done. This must start after the wg.Add call. + //关闭动作,放在发送一方,会更好 + go func() { + wg.Wait() + close(out) + }() + + return out +} + +func generateNumbersPipeline(numbers []int) <-chan int { out := make(chan int) go func() { for _, n := range numbers { out <- n } + //发送完成之后关闭 close(out) }() return out @@ -17,22 +52,8 @@ func squareNumber(in <-chan int) <-chan int { for n := range in { out <- n * n } + //发送完成之后关闭 close(out) }() return out } - -func fanIn(input1, input2 <-chan int) <-chan int { - c := make(chan int) - go func() { - for { - select { - case s := <-input1: - c <- s - case s := <-input2: - c <- s - } - } - }() - return c -} diff --git a/gomore/04_fan_in/fan_in.md b/gomore/04_fan_in/fan_in.md deleted file mode 100644 index 2862bda..0000000 --- a/gomore/04_fan_in/fan_in.md +++ /dev/null @@ -1,41 +0,0 @@ -# Fan-In Messaging Patterns - -=================================== -Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination). - -We can model fan-in using the Go channels. - -```go -// Merge different channels in one channel -func Merge(cs ...<-chan int) <-chan int { - var wg sync.WaitGroup - - out := make(chan int) - - // Start an send goroutine for each input channel in cs. send - // copies values from c to out until c is closed, then calls wg.Done. - send := func(c <-chan int) { - for n := range c { - out <- n - } - wg.Done() - } - - wg.Add(len(cs)) - for _, c := range cs { - go send(c) - } - - // Start a goroutine to close out once all the send goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - return out -} -``` - -The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel. - -Once all the output goroutines have been started, `Merge` a goroutine is started to close the main channel. diff --git a/gomore/04_fan_in/fan_in_test.go b/gomore/04_fan_in/fan_in_test.go index 8cc4d4e..2717f38 100644 --- a/gomore/04_fan_in/fan_in_test.go +++ b/gomore/04_fan_in/fan_in_test.go @@ -5,22 +5,30 @@ import ( "testing" ) -func TestFanIn(T *testing.T) { - randomNumbers := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} +func TestMergeDataSeq(T *testing.T) { + + //第一路输入源 + dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} // generate the common channel with inputs - inputChan := generatePipeline(randomNumbers) + inputChan1 := generateNumbersPipeline(dataStreams1) - // Fan-out to 2 Go-routine - c1 := squareNumber(inputChan) - c2 := squareNumber(inputChan) + //第二路输入源 + dataStreams2 := []int{2, 4, 6, 9, 1, 1, 2, 3, 7, 8} + + inputChan2 := generateNumbersPipeline(dataStreams2) + + c1 := squareNumber(inputChan1) + + c2 := squareNumber(inputChan2) + + //fanIn data for the squared numbers + out := Merge(c1, c2) - // Fan-in the resulting squared numbers - c := fanIn(c1, c2) sum := 0 - // Do the summation - for i := 0; i < len(randomNumbers); i++ { - sum += <-c + for c := range out { + sum += c } - fmt.Printf("Total Sum of Squares: %d", sum) + + fmt.Printf("Total Sum of Squares by FanIn : %d\n", sum) }