diff --git a/messaging/publish_subscribe.md b/messaging/publish_subscribe.md index 638363e..5aa5798 100644 --- a/messaging/publish_subscribe.md +++ b/messaging/publish_subscribe.md @@ -12,63 +12,59 @@ To accomplish this, an intermediary, called a "message broker" or "event bus", receives published messages, and then routes them on to subscribers. -There are three components **messages**, **topics**, **users**. +There are three components **messages**, **topics**, **subscriptions**. ```go type Message struct { // Contents } - type Subscription struct { - ch chan<- Message - - Inbox chan Message + closed bool + inbox chan Message } -func (s *Subscription) Publish(msg Message) error { - if _, ok := <-s.ch; !ok { - return errors.New("Topic has been closed") +func (s *Subscription) Next() (Message, error) { + if s.closed { + return Message{}, errors.New("subscription closed") } + + m, ok := <-s.inbox + if !ok { + return Message{}, errors.New("subscription closed") + } + + return m, nil +} - s.ch <- msg - - return nil +func (s *Subscription) Unsubscribe(Subscription) error { + s.closed = true + close(s.inbox) } ``` ```go type Topic struct { - Subscribers []Session + Subscribers []Subscription MessageHistory []Message } -func (t *Topic) Subscribe(uid uint64) (Subscription, error) { - // Get session and create one if it's the first - - // Add session to the Topic & MessageHistory - - // Create a subscription +func (t *Topic) Subscribe() (Subscription) { + return Subscription{inbox: make(chan Message)} } -func (t *Topic) Unsubscribe(Subscription) error { - // Implementation -} +func (t *Topic) Publish(msg Message) error { + for _, sub := range t.Subscribers { + if sub.closed { + continue + } + + go func() { + sub.inbox <- msg + }() + } -func (t *Topic) Delete() error { - // Implementation -} -``` - -```go -type User struct { - ID uint64 - Name string -} - -type Session struct { - User User - Timestamp time.Time + return nil } ```