From 4bd71e70e3530735f470f9328b38c16ddb325ad0 Mon Sep 17 00:00:00 2001 From: Tamer Tas Date: Tue, 12 Apr 2016 06:25:19 +0300 Subject: [PATCH] Rewrite fan-in/out messaging patterns --- README.md | 4 +-- fan/fan_in.go | 33 --------------------- messaging/fan_in.md | 40 ++++++++++++++++++++++++++ fan/fan_out.go => messaging/fan_out.md | 17 +++++++---- 4 files changed, 54 insertions(+), 40 deletions(-) delete mode 100644 fan/fan_in.go create mode 100644 messaging/fan_in.md rename fan/fan_out.go => messaging/fan_out.md (55%) diff --git a/README.md b/README.md index ce563ea..3d10126 100644 --- a/README.md +++ b/README.md @@ -77,8 +77,8 @@ __Messaging Patterns__: | Pattern | Description | |:-------:| ----------- | -| [Fan-In](fan/fan_in.go) | Funnels tasks to a work sink (e.g. server) | -| [Fan-Out](fan/fan_out.go) | Distributes tasks amongs workers | +| [Fan-In](messaging/fan_in.md) | Funnels tasks to a work sink (e.g. server) | +| [Fan-Out](messaging/fan_out.md) | Distributes tasks amongs workers (e.g. producer) | | [Futures & Promises](futures_promises.go) | Acts as a place-holder of a result that is initally unknown for synchronization purposes | | [Publish/Subscribe](messaging/publish_subscribe.md) | Passes information to a collection of recipients who subscribed to a topic | | [Push & Pull](push_pull.go) | Distributes messages to multiple workers, arranged in a pipeline | diff --git a/fan/fan_in.go b/fan/fan_in.go deleted file mode 100644 index 1d8c035..0000000 --- a/fan/fan_in.go +++ /dev/null @@ -1,33 +0,0 @@ -package fan - -import "sync" - -// In implements fan.In messaging pattern -// Merges different channels in one channel -func In(cs ...<-chan int) <-chan int { - var wg sync.WaitGroup - - out := make(chan int) - - // Start an send goroutine for each input channel in cs. send - // copies values from c to out until c is closed, then calls wg.Done. - send := func(c <-chan int) { - for n := range c { - out <- n - } - wg.Done() - } - - wg.Add(len(cs)) - for _, c := range cs { - go send(c) - } - - // Start a goroutine to close out once all the send goroutines are - // done. This must start after the wg.Add call. - go func() { - wg.Wait() - close(out) - }() - return out -} diff --git a/messaging/fan_in.md b/messaging/fan_in.md new file mode 100644 index 0000000..d4b3991 --- /dev/null +++ b/messaging/fan_in.md @@ -0,0 +1,40 @@ +Fan-In Messaging Patterns +=================================== +Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination). + +We can model fan-in using the Go channels. + +```go +// Merge different channels in one channel +func Merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + + out := make(chan int) + + // Start an send goroutine for each input channel in cs. send + // copies values from c to out until c is closed, then calls wg.Done. + send := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + + wg.Add(len(cs)) + for _, c := range cs { + go send(c) + } + + // Start a goroutine to close out once all the send goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} +``` + +The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel. + +Once all the output goroutines have been started, `Merge` a goroutine is started to close the main channel. diff --git a/fan/fan_out.go b/messaging/fan_out.md similarity index 55% rename from fan/fan_out.go rename to messaging/fan_out.md index 862306b..3052319 100644 --- a/fan/fan_out.go +++ b/messaging/fan_out.md @@ -1,9 +1,12 @@ -package fan +Fan-Out Messaging Pattern +========================= +Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination). -// Out implements fan.Out messaging pattern -// Split a channel into n channels that receive messages -// in a round-robin fashion. -func Out(ch <-chan int, n int) []<-chan int { +We can model fan-out using the Go channels. + +```go +// Split a channel into n channels that receive messages in a round-robin fashion. +func Split(ch <-chan int, n int) []<-chan int { cs := make([]chan int) for i := 0; i < n; i++ { cs = append(cs, make(chan int)) @@ -38,3 +41,7 @@ func Out(ch <-chan int, n int) []<-chan int { return cs } +``` + +The `Split` function converts a single channel into a list of channels by using +a goroutine to copy received values to channels in the list in a round-robin fashion.