finish fan_out pattern

This commit is contained in:
Edward 2020-05-07 17:41:16 +08:00
parent 53b2e22d73
commit a2f634c3b7
2 changed files with 76 additions and 79 deletions

View File

@ -2,6 +2,47 @@ package fanout
import "sync"
// Split2 多工作者重复分发,每个worker分发一波数据
// Split2 a channel into n channels that receive messages in a round-robin fashion.
func Split2(ch <-chan int, n int) []chan int {
cs := []chan int{}
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
var wg sync.WaitGroup
//Distributes one value to channels
distributeToChannels := func(ch <-chan int, cs []chan int) {
// // Close every channel when the execution ends.
//get a target from ch
val, ok := <-ch
if !ok {
return // channel closed
}
//send value to all channels
for _, c := range cs {
c <- val
}
wg.Done()
}
for i := 0; i < n; i++ {
wg.Add(1)
// a worker to distribute message
go distributeToChannels(ch, cs)
}
go func() {
wg.Wait()
for _, c := range cs {
close(c)
}
}()
return cs
}
// Split 重复分发数据为多份
// Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []chan int {
@ -46,51 +87,6 @@ func Split(ch <-chan int, n int) []chan int {
return cs
}
// Split2 多工作者重复分发
// Split2 a channel into n channels that receive messages in a round-robin fashion.
func Split2(ch <-chan int, n int) []chan int {
cs := make([]chan int, 0)
for i := 0; i < n; i++ {
cs = append(cs, make(chan int))
}
distributeToChannels := func(ch <-chan int, cs []chan int) {
// Close every channel when the execution ends.
defer func() {
for _, c := range cs {
close(c)
}
}()
var wg sync.WaitGroup
for {
//get a target from ch
select {
case val, ok := <-ch:
if !ok {
return
}
wg.Add(1)
go func(v int) {
defer wg.Done()
//send value to all channels
for _, c := range cs {
c <- val
}
}(val)
}
}
wg.Wait()
}
go distributeToChannels(ch, cs)
return cs
}
//Split3 随机分发到不同的目的地
//Split3 a channel into n channels that receive messages in a round-robin fashion.
func Split3(ch <-chan int, n int) []chan int {

View File

@ -6,7 +6,41 @@ import (
"testing"
)
//重复分发
//多工作者,重复分发
func TestFanOutDuplicateMultiWorkers(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split2(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
//单个工作者,重复分发
func TestFanOutDuplicate(t *testing.T) {
//一路输入源
@ -77,39 +111,6 @@ func TestFanOutRandom(t *testing.T) {
wg.Wait()
}
//多工作者,重复分发
func TestFanOutDuplicateMultiWorkers(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split2(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
func TestManualFanOutNumbersSeq(T *testing.T) {
//一路输入源