diff --git a/messaging/fan_in.md b/messaging/fan_in.md index d4b3991..16fc630 100644 --- a/messaging/fan_in.md +++ b/messaging/fan_in.md @@ -1,4 +1,5 @@ -Fan-In Messaging Patterns +# Fan-In Messaging Patterns + =================================== Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination). @@ -7,34 +8,34 @@ We can model fan-in using the Go channels. ```go // Merge different channels in one channel func Merge(cs ...<-chan int) <-chan int { - var wg sync.WaitGroup + var wg sync.WaitGroup - out := make(chan int) + out := make(chan int) - // Start an send goroutine for each input channel in cs. send - // copies values from c to out until c is closed, then calls wg.Done. - send := func(c <-chan int) { - for n := range c { - out <- n - } - wg.Done() - } + // Start an send goroutine for each input channel in cs. send + // copies values from c to out until c is closed, then calls wg.Done. + send := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } - wg.Add(len(cs)) - for _, c := range cs { - go send(c) - } + wg.Add(len(cs)) + for _, c := range cs { + go send(c) + } - // Start a goroutine to close out once all the send goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - return out + // Start a goroutine to close out once all the send goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out } ``` The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel. -Once all the output goroutines have been started, `Merge` a goroutine is started to close the main channel. +Once all the output goroutine have been started, `Merge` a goroutine is started to close the main channel. diff --git a/messaging/fan_out.md b/messaging/fan_out.md index 3052319..c49ea8b 100644 --- a/messaging/fan_out.md +++ b/messaging/fan_out.md @@ -1,4 +1,5 @@ -Fan-Out Messaging Pattern +# Fan-Out Messaging Pattern + ========================= Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination). @@ -7,41 +8,41 @@ We can model fan-out using the Go channels. ```go // Split a channel into n channels that receive messages in a round-robin fashion. func Split(ch <-chan int, n int) []<-chan int { - cs := make([]chan int) - for i := 0; i < n; i++ { - cs = append(cs, make(chan int)) - } + cs := make([]chan int) + for i := 0; i < n; i++ { + cs = append(cs, make(chan int)) + } - // Distributes the work in a round robin fashion among the stated number - // of channels until the main channel has been closed. In that case, close - // all channels and return. - distributeToChannels := func(ch <-chan int, cs []chan<- int) { - // Close every channel when the execution ends. - defer func(cs []chan<- int) { - for _, c := range cs { - close(c) - } - }(cs) + // Distributes the work in a round robin fashion among the stated number + // of channels until the main channel has been closed. In that case, close + // all channels and return. + distributeToChannels := func(ch <-chan int, cs []chan<- int) { + // Close every channel when the execution ends. + defer func(cs []chan<- int) { + for _, c := range cs { + close(c) + } + }(cs) - for { - for _, c := range cs { - select { - case val, ok := <-ch: - if !ok { - return - } + for { + for _, c := range cs { + select { + case val, ok := <-ch: + if !ok { + return + } - c <- val - } - } - } - } + c <- val + } + } + } + } - go distributeToChannels(ch, cs) + go distributeToChannels(ch, cs) - return cs + return cs } ``` -The `Split` function converts a single channel into a list of channels by using +The `Split` function converts a single channel into a list of channels by using a goroutine to copy received values to channels in the list in a round-robin fashion. diff --git a/messaging/fanout.go b/messaging/fanout.go index 3d56805..74169d8 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -2,9 +2,10 @@ package messaging import ( "context" - "go.uber.org/zap" "sync" "sync/atomic" + + "go.uber.org/zap" ) var (