add a fan_in_out test

This commit is contained in:
Edward 2020-05-07 12:20:36 +08:00
parent 80cb3ddd5e
commit 4d6d56c338
3 changed files with 141 additions and 277 deletions

View File

@ -5,7 +5,7 @@ import (
"testing"
)
func TestMergeDataSeq(T *testing.T) {
func TestFanInNumbersSeq(T *testing.T) {
//第一路输入源
dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}

View File

@ -1,276 +0,0 @@
package fanout
import (
"context"
"fmt"
"log"
"sync"
"time"
)
func run() {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
var errcList []chan error
// res := fanin(fanout(generateNumbers(ctx), squareNumbers))
nChan, errc, err := generateNumbers(ctx)
if err != nil {
log.Fatal(err)
}
errcList = append(errcList, errc)
fChanl, errcl, err := fanout(ctx, nChan, squareNumbers)
errcList = append(errcList, errcl...)
res, errRes, err := fanin(fChanl, errcl)
go func() {
for {
select {
case r := <-res:
v, ok := r.(int)
if !ok {
// TODO
_ = ok
}
fmt.Println(v)
case e := <-errRes:
fmt.Println(e)
}
}
}()
fmt.Println("finished")
time.Sleep(10 * time.Duration(time.Second))
}
type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error)
func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) {
fmt.Println("generateNumbers called")
out := make(chan interface{}, 100)
errc := make(chan error, 100)
go func() {
defer close(out)
defer close(errc)
for i := 0; i < 10; i++ {
fmt.Println("gen: ", i)
out <- i
}
}()
return out, errc, nil
}
func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) {
fmt.Println("fanout called")
var out []chan interface{}
var errcl []chan error
for i := 0; i < 5; i++ {
res, errc, err := fct(ctx, in)
// Todo: manage error
_ = err
out = append(out, res)
errcl = append(errcl, errc)
}
return out, errcl, nil
}
func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) {
fmt.Println("squareNumbers called")
out := make(chan interface{}, 100)
errc := make(chan error, 100)
go func() {
for v := range in {
num, _ := v.(int)
squared := num * num
select {
case out <- squared:
case <-ctx.Done():
fmt.Println("ctx done")
return
}
}
close(out)
}()
return out, errc, nil
}
func fanin(ins []chan interface{}, errcl []chan error) (chan interface{}, chan error, error) {
fmt.Println("fanin called")
out := make(chan interface{}, 100)
errcout := make(chan error, 100)
var waitgroup sync.WaitGroup
length := len(ins) + len(errcl)
waitgroup.Add(length)
for _, v := range ins {
go func(w chan interface{}) {
for val := range w {
out <- val
}
waitgroup.Done()
}(v)
}
for _, e := range errcl {
go func(errc chan error) {
for v := range errc {
errcout <- v
}
waitgroup.Done()
}(e)
}
go func() {
waitgroup.Wait()
close(out)
close(errcout)
}()
return out, errcout, nil
}
func main() {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
var errcList []chan error
// res := fanin(fanout(generateNumbers(ctx), squareNumbers))
nChan, errc, err := generateNumbers(ctx)
if err != nil {
log.Fatal(err)
}
errcList = append(errcList, errc)
fChanl, errcl, err := fanout(ctx, nChan, squareNumbers)
errcList = append(errcList, errcl...)
res, errRes, err := fanin(fChanl, errcl)
go func() {
for {
select {
case r := <-res:
v, ok := r.(int)
if !ok {
// TODO
_ = ok
}
fmt.Println(v)
case e := <-errRes:
fmt.Println(e)
}
}
}()
fmt.Println("finished")
time.Sleep(10 * time.Duration(time.Second))
}
type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error)
func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) {
fmt.Println("generateNumbers called")
out := make(chan interface{}, 100)
errc := make(chan error, 100)
go func() {
defer close(out)
defer close(errc)
for i := 0; i < 10; i++ {
fmt.Println("gen: ", i)
out <- i
}
}()
return out, errc, nil
}
func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) {
fmt.Println("fanout called")
var out []chan interface{}
var errcl []chan error
for i := 0; i < 5; i++ {
res, errc, err := fct(ctx, in)
// Todo: manage error
_ = err
out = append(out, res)
errcl = append(errcl, errc)
}
return out, errcl, nil
}
func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) {
fmt.Println("squareNumbers called")
out := make(chan interface{}, 100)
errc := make(chan error, 100)
go func() {
for v := range in {
num, _ := v.(int)
squared := num * num
select {
case out <- squared:
case <-ctx.Done():
fmt.Println("ctx done")
return
}
}
close(out)
}()
return out, errc, nil
}
func fanin(ins []chan interface{}, errcl []chan error) (chan interface{}, chan error, error) {
fmt.Println("fanin called")
out := make(chan interface{}, 100)
errcout := make(chan error, 100)
var waitgroup sync.WaitGroup
length := len(ins) + len(errcl)
waitgroup.Add(length)
for _, v := range ins {
go func(w chan interface{}) {
for val := range w {
out <- val
}
waitgroup.Done()
}(v)
}
for _, e := range errcl {
go func(errc chan error) {
for v := range errc {
errcout <- v
}
waitgroup.Done()
}(e)
}
go func() {
waitgroup.Wait()
close(out)
close(errcout)
}()
return out, errcout, nil
}

View File

@ -0,0 +1,140 @@
package fanout
import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"
)
func TestFanInOut(T *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
var errList []chan error
nChan, err, err := generateNumbers(ctx)
if err != nil {
log.Fatal(err)
}
errList = append(errList, err)
fChannel, errList, err := fanout(ctx, nChan, squareNumbers)
errList = append(errList, errList...)
res, errRes, err := fanin(fChannel, errList)
go func() {
for {
select {
case r := <-res:
v, ok := r.(int)
if !ok {
// TODO
_ = ok
}
fmt.Println(v)
case e := <-errRes:
fmt.Println(e)
}
}
}()
fmt.Println("finished")
time.Sleep(10 * time.Duration(time.Second))
}
type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error)
func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) {
fmt.Println("generateNumbers called")
out := make(chan interface{}, 100)
err := make(chan error, 100)
go func() {
defer close(out)
defer close(err)
for i := 0; i < 10; i++ {
fmt.Println("gen: ", i)
out <- i
}
}()
return out, err, nil
}
func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) {
fmt.Println("fanout called")
var out []chan interface{}
var errList []chan error
for i := 0; i < 5; i++ {
res, err, err := fct(ctx, in)
// Todo: manage error
_ = err
out = append(out, res)
errList = append(errList, err)
}
return out, errList, nil
}
func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) {
fmt.Println("squareNumbers called")
out := make(chan interface{}, 100)
err := make(chan error, 100)
go func() {
for v := range in {
num, _ := v.(int)
squared := num * num
select {
case out <- squared:
case <-ctx.Done():
fmt.Println("ctx done")
return
}
}
close(out)
}()
return out, err, nil
}
func fanin(ins []chan interface{}, errList []chan error) (chan interface{}, chan error, error) {
fmt.Println("fanin called")
out := make(chan interface{}, 100)
errout := make(chan error, 100)
var waitgroup sync.WaitGroup
length := len(ins) + len(errList)
waitgroup.Add(length)
for _, v := range ins {
go func(w chan interface{}) {
for val := range w {
out <- val
}
waitgroup.Done()
}(v)
}
for _, e := range errList {
go func(err chan error) {
for v := range err {
errout <- v
}
waitgroup.Done()
}(e)
}
go func() {
waitgroup.Wait()
close(out)
close(errout)
}()
return out, errout, nil
}