From 8b0943a7b24e317e7f1f23d08232c05abb580a07 Mon Sep 17 00:00:00 2001 From: Tamer Tas Date: Tue, 12 Apr 2016 06:11:51 +0300 Subject: [PATCH] Add pub/sub messaging pattern --- README.md | 2 +- messaging/publish_subscribe.md | 80 ++++++++++++++++++++++++++++++++++ publish_subscribe.go | 62 -------------------------- 3 files changed, 81 insertions(+), 63 deletions(-) create mode 100644 messaging/publish_subscribe.md delete mode 100644 publish_subscribe.go diff --git a/README.md b/README.md index 79960a0..ce563ea 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ __Messaging Patterns__: | [Fan-In](fan/fan_in.go) | Funnels tasks to a work sink (e.g. server) | | [Fan-Out](fan/fan_out.go) | Distributes tasks amongs workers | | [Futures & Promises](futures_promises.go) | Acts as a place-holder of a result that is initally unknown for synchronization purposes | -| [Publish/Subscribe](publish_subscribe.go) | Passes information to a collection of recipients who subscribed to a topic | +| [Publish/Subscribe](messaging/publish_subscribe.md) | Passes information to a collection of recipients who subscribed to a topic | | [Push & Pull](push_pull.go) | Distributes messages to multiple workers, arranged in a pipeline | __Stability Patterns__: diff --git a/messaging/publish_subscribe.md b/messaging/publish_subscribe.md new file mode 100644 index 0000000..efb46b5 --- /dev/null +++ b/messaging/publish_subscribe.md @@ -0,0 +1,80 @@ +Publish & Subscribe Messaging Pattern +============ +Publish-Subscribe is a messaging pattern used to communicate messages between +different components without these components knowing anything about each other's identity. + +It is similar to the Observer behavioral design pattern. +The fundamental design principals of both Observer and Publish-Subscribe is the decoupling of +those interested in being informed about `Event Messages` from the informer (Observers or Publishers). +Meaning that you don't have to program the messages to be sent directly to specific receivers. + +To accomplish this, an intermediary, called a "message broker" or "event bus", +receives published messages, and then routes them on to subscribers. + + +There are three components **messages**, **topics**, **users**. + +```go +type Message struct { + // Contents +} + + +type Subscription struct { + ch chan<- Message + + Inbox chan Message +} + +func (s *Subscription) Publish(msg Message) error { + if _, ok := ch; !ok { + return errors.New("Topic has been closed") + } + + ch <- msg + + return nil +} +``` + +```go +type Topic struct { + Subscribers []Session + MessageHistory []Message +} + +func (t *Topic) Subscribe(uid uint64) (Subscription, error) { + // Get session and create one if it's the first + + // Add session to the Topic & MessageHistory + + // Create a subscription +} + +func (t *Topic) Unsubscribe(Subscription) error { + // Implementation +} + +func (t *Topic) Delete() error { + // Implementation +} +``` + +```go +type User struct { + ID uint64 + Name string +} + +type Session struct { + User User + Timestamp time.Time +} +``` + +Improvements +============ +Events can be published in a parallel fashion by utilizing stackless goroutines. + +Performance can be improved by dealing with straggler subscribers +by using a buffered inbox and you stop sending events once the inbox is full. diff --git a/publish_subscribe.go b/publish_subscribe.go deleted file mode 100644 index 441e01b..0000000 --- a/publish_subscribe.go +++ /dev/null @@ -1,62 +0,0 @@ -package main - -import ( - "errors" - "time" -) - -var ( - ErrTopicClosed = errors.New("Topic has been closed") -) - -// Message -type Message string - -// Topic -type Topic struct { - Subscribers []Authentication - MessageHistory []struct { - Author string - Message Message - Timestamp time.Time - } -} - -// Subscribe -func (t *Topic) Subscribe(Authentication) (Subscription, error) { - // Implementation -} - -// Unsubscribe -func (t *Topic) Unsubscribe(Subscription) error { - // Implementation -} - -// Delete -func (t *Topic) Delete() error { - // Implementation -} - -type Subscription struct { - ch chan<- Message - - Inbox chan Message -} - -// Publish -func (s *Subscription) Publish(msg Message) error { - if _, ok := ch; !ok { - return ErrTopicClosed - } - - ch <- msg - - return nil -} - -// Authentication -type Authentication struct { -} - -func main() { -}