fix readme

This commit is contained in:
Edward 2020-05-07 12:18:03 +08:00
parent a3638a1084
commit 9cb8114cce
3 changed files with 57 additions and 54 deletions

View File

@ -1,4 +1,5 @@
Fan-In Messaging Patterns # Fan-In Messaging Patterns
=================================== ===================================
Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination). Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination).
@ -7,34 +8,34 @@ We can model fan-in using the Go channels.
```go ```go
// Merge different channels in one channel // Merge different channels in one channel
func Merge(cs ...<-chan int) <-chan int { func Merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup var wg sync.WaitGroup
out := make(chan int) out := make(chan int)
// Start an send goroutine for each input channel in cs. send // Start an send goroutine for each input channel in cs. send
// copies values from c to out until c is closed, then calls wg.Done. // copies values from c to out until c is closed, then calls wg.Done.
send := func(c <-chan int) { send := func(c <-chan int) {
for n := range c { for n := range c {
out <- n out <- n
} }
wg.Done() wg.Done()
} }
wg.Add(len(cs)) wg.Add(len(cs))
for _, c := range cs { for _, c := range cs {
go send(c) go send(c)
} }
// Start a goroutine to close out once all the send goroutines are // Start a goroutine to close out once all the send goroutines are
// done. This must start after the wg.Add call. // done. This must start after the wg.Add call.
go func() { go func() {
wg.Wait() wg.Wait()
close(out) close(out)
}() }()
return out return out
} }
``` ```
The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel. The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel.
Once all the output goroutines have been started, `Merge` a goroutine is started to close the main channel. Once all the output goroutine have been started, `Merge` a goroutine is started to close the main channel.

View File

@ -1,4 +1,5 @@
Fan-Out Messaging Pattern # Fan-Out Messaging Pattern
========================= =========================
Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination). Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination).
@ -7,39 +8,39 @@ We can model fan-out using the Go channels.
```go ```go
// Split a channel into n channels that receive messages in a round-robin fashion. // Split a channel into n channels that receive messages in a round-robin fashion.
func Split(ch <-chan int, n int) []<-chan int { func Split(ch <-chan int, n int) []<-chan int {
cs := make([]chan int) cs := make([]chan int)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
cs = append(cs, make(chan int)) cs = append(cs, make(chan int))
} }
// Distributes the work in a round robin fashion among the stated number // Distributes the work in a round robin fashion among the stated number
// of channels until the main channel has been closed. In that case, close // of channels until the main channel has been closed. In that case, close
// all channels and return. // all channels and return.
distributeToChannels := func(ch <-chan int, cs []chan<- int) { distributeToChannels := func(ch <-chan int, cs []chan<- int) {
// Close every channel when the execution ends. // Close every channel when the execution ends.
defer func(cs []chan<- int) { defer func(cs []chan<- int) {
for _, c := range cs { for _, c := range cs {
close(c) close(c)
} }
}(cs) }(cs)
for { for {
for _, c := range cs { for _, c := range cs {
select { select {
case val, ok := <-ch: case val, ok := <-ch:
if !ok { if !ok {
return return
} }
c <- val c <- val
} }
} }
} }
} }
go distributeToChannels(ch, cs) go distributeToChannels(ch, cs)
return cs return cs
} }
``` ```

View File

@ -2,9 +2,10 @@ package messaging
import ( import (
"context" "context"
"go.uber.org/zap"
"sync" "sync"
"sync/atomic" "sync/atomic"
"go.uber.org/zap"
) )
var ( var (