1
0
mirror of https://github.com/tmrts/go-patterns.git synced 2024-11-22 04:56:09 +03:00

Update publish_subscribe.md

This commit is contained in:
Robert McLeod 2020-09-28 17:44:56 +13:00 committed by GitHub
parent f978e42036
commit 0dd35222f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -12,63 +12,59 @@ To accomplish this, an intermediary, called a "message broker" or "event bus",
receives published messages, and then routes them on to subscribers.
There are three components **messages**, **topics**, **users**.
There are three components **messages**, **topics**, **subscriptions**.
```go
type Message struct {
// Contents
}
type Subscription struct {
ch chan<- Message
Inbox chan Message
closed bool
inbox chan Message
}
func (s *Subscription) Publish(msg Message) error {
if _, ok := <-s.ch; !ok {
return errors.New("Topic has been closed")
func (s *Subscription) Next() (Message, error) {
if s.closed {
return Message{}, errors.New("subscription closed")
}
m, ok := <-s.inbox
if !ok {
return Message{}, errors.New("subscription closed")
}
return m, nil
}
s.ch <- msg
return nil
func (s *Subscription) Unsubscribe(Subscription) error {
s.closed = true
close(s.inbox)
}
```
```go
type Topic struct {
Subscribers []Session
Subscribers []Subscription
MessageHistory []Message
}
func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
// Get session and create one if it's the first
// Add session to the Topic & MessageHistory
// Create a subscription
func (t *Topic) Subscribe() (Subscription) {
return Subscription{inbox: make(chan Message)}
}
func (t *Topic) Unsubscribe(Subscription) error {
// Implementation
}
func (t *Topic) Publish(msg Message) error {
for _, sub := range t.Subscribers {
if sub.closed {
continue
}
go func() {
sub.inbox <- msg
}()
}
func (t *Topic) Delete() error {
// Implementation
}
```
```go
type User struct {
ID uint64
Name string
}
type Session struct {
User User
Timestamp time.Time
return nil
}
```