From a8f8670c1ef4c846a2177cef445be20d982d5e34 Mon Sep 17 00:00:00 2001 From: Tamer Tas Date: Mon, 18 Jan 2016 19:01:39 +0200 Subject: [PATCH] Add fan-in/out messaging patterns --- fan/fan_in.go | 33 +++++++++++++++++++++++++++++++++ fan/fan_out.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 fan/fan_in.go create mode 100644 fan/fan_out.go diff --git a/fan/fan_in.go b/fan/fan_in.go new file mode 100644 index 0000000..1d8c035 --- /dev/null +++ b/fan/fan_in.go @@ -0,0 +1,33 @@ +package fan + +import "sync" + +// In implements fan.In messaging pattern +// Merges different channels in one channel +func In(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + + out := make(chan int) + + // Start an send goroutine for each input channel in cs. send + // copies values from c to out until c is closed, then calls wg.Done. + send := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + + wg.Add(len(cs)) + for _, c := range cs { + go send(c) + } + + // Start a goroutine to close out once all the send goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} diff --git a/fan/fan_out.go b/fan/fan_out.go new file mode 100644 index 0000000..862306b --- /dev/null +++ b/fan/fan_out.go @@ -0,0 +1,40 @@ +package fan + +// Out implements fan.Out messaging pattern +// Split a channel into n channels that receive messages +// in a round-robin fashion. +func Out(ch <-chan int, n int) []<-chan int { + cs := make([]chan int) + for i := 0; i < n; i++ { + cs = append(cs, make(chan int)) + } + + // Distributes the work in a round robin fashion among the stated number + // of channels until the main channel has been closed. In that case, close + // all channels and return. + distributeToChannels := func(ch <-chan int, cs []chan<- int) { + // Close every channel when the execution ends. + defer func(cs []chan<- int) { + for _, c := range cs { + close(c) + } + }(cs) + + for { + for _, c := range cs { + select { + case val, ok := <-ch: + if !ok { + return + } + + c <- val + } + } + } + } + + go distributeToChannels(ch, cs) + + return cs +}