basic finish fanout pattern

This commit is contained in:
Edward 2020-05-07 16:52:03 +08:00
parent 0aed8d3bcb
commit 53b2e22d73
4 changed files with 130 additions and 31 deletions

View File

@ -5,7 +5,7 @@ import (
"testing" "testing"
) )
func TestFanInOutNumbersSeq(T *testing.T) { func TestFanInOutNumbersSeq(t *testing.T) {
//一路输入源 //一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}

View File

@ -5,7 +5,7 @@ import (
"testing" "testing"
) )
func TestFanInNumbersSeq(T *testing.T) { func TestFanInNumbersSeq(t *testing.T) {
//第一路输入源 //第一路输入源
dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}

View File

@ -1,22 +1,28 @@
package fanout package fanout
import "sync"
// Split 重复分发数据为多份 // Split 重复分发数据为多份
// Split a channel into n channels that receive messages in a round-robin fashion. // Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []chan int { func Split(ch <-chan int, n int) []chan int {
//get a queue of chan //get a queue of chan
cs := make([]chan int, n) //cs := make([]chan int, n) //创建了个chan 数组但是空的
// Distributes the work in a round robin fashion among the stated number cs := []chan int{}
// of channels until the main channel has been closed. In that case, close for i := 0; i < n; i++ {
// all channels and return. cs = append(cs, make(chan int))
}
//Distributes the work in a round robin fashion among the stated number of channels
//until the main channel has been closed. In that case, close all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan int) { distributeToChannels := func(ch <-chan int, cs []chan int) {
// Close every channel when the execution ends. // Close every channel when the execution ends.
defer func(cs []chan int) { defer func() {
for _, c := range cs { for _, c := range cs {
close(c) close(c)
} }
}(cs) }()
//this version will block //this version will block
for { for {
@ -24,7 +30,7 @@ func Split(ch <-chan int, n int) []chan int {
select { select {
case val, ok := <-ch: case val, ok := <-ch:
if !ok { if !ok {
return return // channel closed
} }
//send value to all channels //send value to all channels
for _, c := range cs { for _, c := range cs {
@ -42,47 +48,68 @@ func Split(ch <-chan int, n int) []chan int {
// Split2 多工作者重复分发 // Split2 多工作者重复分发
// Split2 a channel into n channels that receive messages in a round-robin fashion. // Split2 a channel into n channels that receive messages in a round-robin fashion.
func Split2(ch <-chan int, n int) []<-chan int { func Split2(ch <-chan int, n int) []chan int {
cs := make([]chan int, n) cs := make([]chan int, 0)
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
//this version will block distributeToChannels := func(ch <-chan int, cs []chan int) {
// Close every channel when the execution ends.
defer func() {
for _, c := range cs {
close(c)
}
}()
var wg sync.WaitGroup
for { for {
//get a target from ch //get a target from ch
select { select {
case val, ok := <-ch: case val, ok := <-ch:
if !ok { if !ok {
return cs return
} }
wg.Add(1)
go func(v int) {
defer wg.Done()
//send value to all channels //send value to all channels
for _, c := range cs { for _, c := range cs {
go func(tmpV int) { c <- val
c <- tmpV }
}(val) }(val)
} }
} }
wg.Wait()
} }
go distributeToChannels(ch, cs)
return cs return cs
} }
//Split3 随机分发到不同的目的地 //Split3 随机分发到不同的目的地
//Split3 a channel into n channels that receive messages in a round-robin fashion. //Split3 a channel into n channels that receive messages in a round-robin fashion.
func Split3(ch <-chan int, n int) []<-chan int { func Split3(ch <-chan int, n int) []chan int {
cs := make([]chan int, n) cs := make([]chan int, 0)
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
// Distributes the work in a round robin fashion among the stated number // Distributes the work in a round robin fashion among the stated number
// of channels until the main channel has been closed. In that case, close // of channels until the main channel has been closed. In that case, close
// all channels and return. // all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan<- int) { distributeToChannels := func(ch <-chan int, cs []chan int) {
// Close every channel when the execution ends. // Close every channel when the execution ends.
defer func(cs []chan<- int) { defer func() {
for _, c := range cs { for _, c := range cs {
close(c) close(c)
} }
}(cs) }()
for { for {
for _, c := range cs { for _, c := range cs {
@ -91,7 +118,6 @@ func Split3(ch <-chan int, n int) []<-chan int {
if !ok { if !ok {
return return
} }
c <- val c <- val
} }
} }

View File

@ -6,7 +6,8 @@ import (
"testing" "testing"
) )
func TestMultiFanOutNumbersSeq(T *testing.T) { //重复分发
func TestFanOutDuplicate(t *testing.T) {
//一路输入源 //一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
@ -20,9 +21,11 @@ func TestMultiFanOutNumbersSeq(T *testing.T) {
// 重复分发 // 重复分发
outArray := Split(ch, 3) outArray := Split(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(outArray)) wg.Add(length)
for i := 0; i < len(outArray); i++ { for i := 0; i < length; i++ {
go func(in <-chan int, index int) { go func(in <-chan int, index int) {
sum := 0 sum := 0
@ -37,6 +40,76 @@ func TestMultiFanOutNumbersSeq(T *testing.T) {
wg.Wait() wg.Wait()
} }
//随机分发
// worker: 2 11245
// worker: 0 14988
// worker: 1 10117
func TestFanOutRandom(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split3(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
//多工作者,重复分发
func TestFanOutDuplicateMultiWorkers(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split2(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
func TestManualFanOutNumbersSeq(T *testing.T) { func TestManualFanOutNumbersSeq(T *testing.T) {
//一路输入源 //一路输入源