finish fan_in pattern

This commit is contained in:
Edward 2020-05-07 11:55:36 +08:00
parent d020f12a2b
commit 62bdf96dcd
3 changed files with 57 additions and 69 deletions

View File

@ -1,11 +1,46 @@
package fanin
func generatePipeline(numbers []int) <-chan int {
import "sync"
// Merge operate a FanIn to compose different channels into one
func Merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 3)
wg.Add(len(cs))
// Start an send goroutine for each input channel in cs. send
// copies values from c to out until c is closed, then calls wg.Done.
send := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
//启动多个 go routine 开始工作
for _, c := range cs {
go send(c)
}
// Start a goroutine to close out once all the send goroutines are
// done. This must start after the wg.Add call.
//关闭动作,放在发送一方,会更好
go func() {
wg.Wait()
close(out)
}()
return out
}
func generateNumbersPipeline(numbers []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numbers {
out <- n
}
//发送完成之后关闭
close(out)
}()
return out
@ -17,22 +52,8 @@ func squareNumber(in <-chan int) <-chan int {
for n := range in {
out <- n * n
}
//发送完成之后关闭
close(out)
}()
return out
}
func fanIn(input1, input2 <-chan int) <-chan int {
c := make(chan int)
go func() {
for {
select {
case s := <-input1:
c <- s
case s := <-input2:
c <- s
}
}
}()
return c
}

View File

@ -1,41 +0,0 @@
# Fan-In Messaging Patterns
===================================
Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination).
We can model fan-in using the Go channels.
```go
// Merge different channels in one channel
func Merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an send goroutine for each input channel in cs. send
// copies values from c to out until c is closed, then calls wg.Done.
send := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go send(c)
}
// Start a goroutine to close out once all the send goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
```
The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel.
Once all the output goroutines have been started, `Merge` a goroutine is started to close the main channel.

View File

@ -5,22 +5,30 @@ import (
"testing"
)
func TestFanIn(T *testing.T) {
randomNumbers := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
func TestMergeDataSeq(T *testing.T) {
//第一路输入源
dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
// generate the common channel with inputs
inputChan := generatePipeline(randomNumbers)
inputChan1 := generateNumbersPipeline(dataStreams1)
// Fan-out to 2 Go-routine
c1 := squareNumber(inputChan)
c2 := squareNumber(inputChan)
//第二路输入源
dataStreams2 := []int{2, 4, 6, 9, 1, 1, 2, 3, 7, 8}
inputChan2 := generateNumbersPipeline(dataStreams2)
c1 := squareNumber(inputChan1)
c2 := squareNumber(inputChan2)
//fanIn data for the squared numbers
out := Merge(c1, c2)
// Fan-in the resulting squared numbers
c := fanIn(c1, c2)
sum := 0
// Do the summation
for i := 0; i < len(randomNumbers); i++ {
sum += <-c
for c := range out {
sum += c
}
fmt.Printf("Total Sum of Squares: %d", sum)
fmt.Printf("Total Sum of Squares by FanIn : %d\n", sum)
}