From 4d6d56c3387715c1346adbad9e5f3a1f85c86fff Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 7 May 2020 12:20:36 +0800 Subject: [PATCH] add a fan_in_out test --- gomore/04_fan_in/fan_in_test.go | 2 +- gomore/05_fan_out/fan_out_in.go | 276 --------------------------- gomore/05_fan_out/fan_out_in_test.go | 140 ++++++++++++++ 3 files changed, 141 insertions(+), 277 deletions(-) delete mode 100644 gomore/05_fan_out/fan_out_in.go create mode 100644 gomore/05_fan_out/fan_out_in_test.go diff --git a/gomore/04_fan_in/fan_in_test.go b/gomore/04_fan_in/fan_in_test.go index 2717f38..a34da63 100644 --- a/gomore/04_fan_in/fan_in_test.go +++ b/gomore/04_fan_in/fan_in_test.go @@ -5,7 +5,7 @@ import ( "testing" ) -func TestMergeDataSeq(T *testing.T) { +func TestFanInNumbersSeq(T *testing.T) { //第一路输入源 dataStreams1 := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23} diff --git a/gomore/05_fan_out/fan_out_in.go b/gomore/05_fan_out/fan_out_in.go deleted file mode 100644 index 88e65d0..0000000 --- a/gomore/05_fan_out/fan_out_in.go +++ /dev/null @@ -1,276 +0,0 @@ -package fanout - -import ( - "context" - "fmt" - "log" - "sync" - "time" -) - -func run() { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - var errcList []chan error - - // res := fanin(fanout(generateNumbers(ctx), squareNumbers)) - - nChan, errc, err := generateNumbers(ctx) - if err != nil { - log.Fatal(err) - } - errcList = append(errcList, errc) - - fChanl, errcl, err := fanout(ctx, nChan, squareNumbers) - errcList = append(errcList, errcl...) - - res, errRes, err := fanin(fChanl, errcl) - - go func() { - for { - select { - case r := <-res: - v, ok := r.(int) - if !ok { - // TODO - _ = ok - } - fmt.Println(v) - case e := <-errRes: - fmt.Println(e) - } - } - }() - - fmt.Println("finished") - time.Sleep(10 * time.Duration(time.Second)) -} - -type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error) - -func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) { - fmt.Println("generateNumbers called") - out := make(chan interface{}, 100) - errc := make(chan error, 100) - - go func() { - defer close(out) - defer close(errc) - - for i := 0; i < 10; i++ { - fmt.Println("gen: ", i) - out <- i - } - }() - - return out, errc, nil -} - -func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) { - fmt.Println("fanout called") - var out []chan interface{} - var errcl []chan error - - for i := 0; i < 5; i++ { - res, errc, err := fct(ctx, in) - // Todo: manage error - _ = err - out = append(out, res) - errcl = append(errcl, errc) - } - - return out, errcl, nil -} - -func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) { - fmt.Println("squareNumbers called") - out := make(chan interface{}, 100) - errc := make(chan error, 100) - - go func() { - for v := range in { - num, _ := v.(int) - - squared := num * num - - select { - case out <- squared: - case <-ctx.Done(): - fmt.Println("ctx done") - return - } - } - close(out) - }() - - return out, errc, nil -} - -func fanin(ins []chan interface{}, errcl []chan error) (chan interface{}, chan error, error) { - fmt.Println("fanin called") - out := make(chan interface{}, 100) - errcout := make(chan error, 100) - var waitgroup sync.WaitGroup - - length := len(ins) + len(errcl) - waitgroup.Add(length) - - for _, v := range ins { - go func(w chan interface{}) { - for val := range w { - out <- val - } - waitgroup.Done() - }(v) - } - - for _, e := range errcl { - go func(errc chan error) { - for v := range errc { - errcout <- v - } - waitgroup.Done() - }(e) - } - - go func() { - waitgroup.Wait() - close(out) - close(errcout) - }() - - return out, errcout, nil -} -func main() { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - var errcList []chan error - - // res := fanin(fanout(generateNumbers(ctx), squareNumbers)) - - nChan, errc, err := generateNumbers(ctx) - if err != nil { - log.Fatal(err) - } - errcList = append(errcList, errc) - - fChanl, errcl, err := fanout(ctx, nChan, squareNumbers) - errcList = append(errcList, errcl...) - - res, errRes, err := fanin(fChanl, errcl) - - go func() { - for { - select { - case r := <-res: - v, ok := r.(int) - if !ok { - // TODO - _ = ok - } - fmt.Println(v) - case e := <-errRes: - fmt.Println(e) - } - } - }() - - fmt.Println("finished") - time.Sleep(10 * time.Duration(time.Second)) -} - -type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error) - -func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) { - fmt.Println("generateNumbers called") - out := make(chan interface{}, 100) - errc := make(chan error, 100) - - go func() { - defer close(out) - defer close(errc) - - for i := 0; i < 10; i++ { - fmt.Println("gen: ", i) - out <- i - } - }() - - return out, errc, nil -} - -func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) { - fmt.Println("fanout called") - var out []chan interface{} - var errcl []chan error - - for i := 0; i < 5; i++ { - res, errc, err := fct(ctx, in) - // Todo: manage error - _ = err - out = append(out, res) - errcl = append(errcl, errc) - } - - return out, errcl, nil -} - -func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) { - fmt.Println("squareNumbers called") - out := make(chan interface{}, 100) - errc := make(chan error, 100) - - go func() { - for v := range in { - num, _ := v.(int) - - squared := num * num - - select { - case out <- squared: - case <-ctx.Done(): - fmt.Println("ctx done") - return - } - } - close(out) - }() - - return out, errc, nil -} - -func fanin(ins []chan interface{}, errcl []chan error) (chan interface{}, chan error, error) { - fmt.Println("fanin called") - out := make(chan interface{}, 100) - errcout := make(chan error, 100) - var waitgroup sync.WaitGroup - - length := len(ins) + len(errcl) - waitgroup.Add(length) - - for _, v := range ins { - go func(w chan interface{}) { - for val := range w { - out <- val - } - waitgroup.Done() - }(v) - } - - for _, e := range errcl { - go func(errc chan error) { - for v := range errc { - errcout <- v - } - waitgroup.Done() - }(e) - } - - go func() { - waitgroup.Wait() - close(out) - close(errcout) - }() - - return out, errcout, nil -} diff --git a/gomore/05_fan_out/fan_out_in_test.go b/gomore/05_fan_out/fan_out_in_test.go new file mode 100644 index 0000000..d9bf824 --- /dev/null +++ b/gomore/05_fan_out/fan_out_in_test.go @@ -0,0 +1,140 @@ +package fanout + +import ( + "context" + "fmt" + "log" + "sync" + "testing" + "time" +) + +func TestFanInOut(T *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + var errList []chan error + + nChan, err, err := generateNumbers(ctx) + if err != nil { + log.Fatal(err) + } + errList = append(errList, err) + + fChannel, errList, err := fanout(ctx, nChan, squareNumbers) + errList = append(errList, errList...) + + res, errRes, err := fanin(fChannel, errList) + + go func() { + for { + select { + case r := <-res: + v, ok := r.(int) + if !ok { + // TODO + _ = ok + } + fmt.Println(v) + case e := <-errRes: + fmt.Println(e) + } + } + }() + + fmt.Println("finished") + time.Sleep(10 * time.Duration(time.Second)) +} + +type Op func(context.Context, chan interface{}) (chan interface{}, chan error, error) + +func generateNumbers(ctx context.Context) (chan interface{}, chan error, error) { + fmt.Println("generateNumbers called") + out := make(chan interface{}, 100) + err := make(chan error, 100) + + go func() { + defer close(out) + defer close(err) + + for i := 0; i < 10; i++ { + fmt.Println("gen: ", i) + out <- i + } + }() + + return out, err, nil +} + +func fanout(ctx context.Context, in chan interface{}, fct Op) ([]chan interface{}, []chan error, error) { + fmt.Println("fanout called") + var out []chan interface{} + var errList []chan error + + for i := 0; i < 5; i++ { + res, err, err := fct(ctx, in) + // Todo: manage error + _ = err + out = append(out, res) + errList = append(errList, err) + } + + return out, errList, nil +} + +func squareNumbers(ctx context.Context, in chan interface{}) (chan interface{}, chan error, error) { + fmt.Println("squareNumbers called") + out := make(chan interface{}, 100) + err := make(chan error, 100) + + go func() { + for v := range in { + num, _ := v.(int) + squared := num * num + select { + case out <- squared: + case <-ctx.Done(): + fmt.Println("ctx done") + return + } + } + close(out) + }() + + return out, err, nil +} + +func fanin(ins []chan interface{}, errList []chan error) (chan interface{}, chan error, error) { + fmt.Println("fanin called") + out := make(chan interface{}, 100) + errout := make(chan error, 100) + var waitgroup sync.WaitGroup + + length := len(ins) + len(errList) + waitgroup.Add(length) + + for _, v := range ins { + go func(w chan interface{}) { + for val := range w { + out <- val + } + waitgroup.Done() + }(v) + } + + for _, e := range errList { + go func(err chan error) { + for v := range err { + errout <- v + } + waitgroup.Done() + }(e) + } + + go func() { + waitgroup.Wait() + close(out) + close(errout) + }() + + return out, errout, nil +}