From 2178e802c28542c803cb75411d9791b707f0def3 Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 30 Apr 2020 14:50:21 +0800 Subject: [PATCH] finish pub/sub pattern --- gomore/27_messages/README.md | 10 ++- gomore/27_messages/message.go | 56 +++++++------- gomore/27_messages/message_queue.go | 17 ++++ gomore/27_messages/message_test.go | 115 +++++++++++++++++----------- 4 files changed, 123 insertions(+), 75 deletions(-) create mode 100644 gomore/27_messages/message_queue.go diff --git a/gomore/27_messages/README.md b/gomore/27_messages/README.md index 648a61a..85e605a 100644 --- a/gomore/27_messages/README.md +++ b/gomore/27_messages/README.md @@ -10,10 +10,12 @@ 图片来源:[pubsub-pattern.md](https://github.com/imsardine/dev-notes/blob/source/docs/pubsub-pattern.md) -示例演示消息队的某个主题(Topic)收到用户订阅之后,的处理过程和与用户之间的响应互动。 +现实生活中的各种信息平台,就是很好的发布订阅的例子,比如某八戒、某无忧等 + +这里演示,一个拼车例子,车主发布拼车(Topic)消息,消息推送到订阅拼车(Topic)信息的所有用户. 并模拟以下情形: -+ 主题向用户发送消息 -+ 用户订阅主题 -+ 用户发送某个主题消息 ++ 车主(Topic)发布拼车消息 ++ 拼车用户订阅拼车消息(Topic) ++ 拼车用户处理收到的拼车消息 diff --git a/gomore/27_messages/message.go b/gomore/27_messages/message.go index 23094b7..5f44630 100644 --- a/gomore/27_messages/message.go +++ b/gomore/27_messages/message.go @@ -1,8 +1,9 @@ -package gomore +package messaging import ( "context" "errors" + "fmt" "time" ) @@ -32,8 +33,8 @@ type Subscription struct { topicName string ctx context.Context cancel context.CancelFunc - ch chan<- Message //发送队列 - Inbox <-chan Message //接收消息的队列 + Ch chan<- Message //发送队列 + Inbox chan Message //接收消息的队列 } func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription { @@ -44,8 +45,8 @@ func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscripti Session: Session{User{ID: uid}, time.Now()}, ctx: ctx, cancel: cancel, - ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息 - Inbox: make(<-chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息 + Ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息 + Inbox: make(chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息 } } @@ -55,6 +56,8 @@ func (s *Subscription) Cancel() { s.cancel() } +//Publish 这个表示用户订阅到感兴趣的主题的时候,同时也可以发送消息, +//Publish 但是,示例中不演示这个用途 //Publish 只有当channel无数据,且channel被close了,才会返回ok=false //Publish a message to subscription queue func (s *Subscription) Publish(msg Message) error { @@ -63,7 +66,7 @@ func (s *Subscription) Publish(msg Message) error { case <-s.ctx.Done(): return errors.New("Topic has been closed") default: - s.ch <- msg + s.Ch <- msg } return nil } @@ -74,14 +77,15 @@ func (s *Subscription) Receive(out *Message) error { select { case <-s.ctx.Done(): return errors.New("Topic has been closed") - default: - *out = <-s.Inbox + case <-time.After(time.Millisecond * 100): + return errors.New("time out error") + case *out = <-s.Inbox: + return nil } - return nil } //Topic that user is interested in -//Topic locate in MQ +//Topic should locate in MQ type Topic struct { UserQueueSize int Name string @@ -89,20 +93,22 @@ type Topic struct { MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间 } -//Queue hold all topics -type Queue struct { - Topics map[string]Topic //topic ID<-----> topic Object -} +//Publish 只有当channel无数据,且channel被close了,才会返回ok=false +//Publish a message to subscription queue +func (t *Topic) Publish(msg Message) error { -//AddTopic to Queue -func (q *Queue) AddTopic(topicName string, topicUserQueueSize int) Topic { - if q.Topics == nil { - q.Topics = make(map[string]Topic) + //将消息发布给当前Topic的所有人 + for usersID, subscription := range t.Subscribers { + if subscription.ID == usersID { + subscription.Inbox <- msg + } } - if _, found := q.Topics[topicName]; !found { - q.Topics[topicName] = Topic{UserQueueSize: topicUserQueueSize, Name: topicName} - } - return q.Topics[topicName] + //save message history + t.MessageHistory = append(t.MessageHistory, msg) + + fmt.Println("current histroy message count: ", len(t.MessageHistory)) + + return nil } func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) { @@ -124,17 +130,15 @@ func (t *Topic) Subscribe(uid uint64, topicName string) (Subscription, bool) { return Subscription{}, false } - var subscription Subscription // Get session or create one if it's the first if _, found := t.findUserSubscription(uid, topicName); !found { if t.Subscribers == nil { t.Subscribers = make(map[uint64]Subscription) } - subscription = newSubscription(uid, topicName, t.UserQueueSize) - t.Subscribers[uid] = subscription + t.Subscribers[uid] = newSubscription(uid, topicName, t.UserQueueSize) } - return subscription, true + return t.Subscribers[uid], true } //Unsubscribe remove Subscription diff --git a/gomore/27_messages/message_queue.go b/gomore/27_messages/message_queue.go new file mode 100644 index 0000000..5199de6 --- /dev/null +++ b/gomore/27_messages/message_queue.go @@ -0,0 +1,17 @@ +package messaging + +//Queue hold all topics +type Queue struct { + Topics map[string]Topic //topic ID<-----> topic Object +} + +//AddTopic to Queue +func (q *Queue) AddTopic(topicName string, topicUserQueueSize int) Topic { + if q.Topics == nil { + q.Topics = make(map[string]Topic) + } + if _, found := q.Topics[topicName]; !found { + q.Topics[topicName] = Topic{UserQueueSize: topicUserQueueSize, Name: topicName} + } + return q.Topics[topicName] +} diff --git a/gomore/27_messages/message_test.go b/gomore/27_messages/message_test.go index d4ae37a..fcec0f1 100644 --- a/gomore/27_messages/message_test.go +++ b/gomore/27_messages/message_test.go @@ -1,7 +1,9 @@ -package gomore +package messaging import ( + "context" "fmt" + "sync" "testing" "time" ) @@ -9,76 +11,99 @@ import ( //////////////////////////////// //通常意义上是,连接消息队列之后就可以发送消息 //当订阅著之后才会收到相关Topic消息的推送 -//这里为了简化,直接订阅成功后发送消息,省去连接消息队列发送消息的步骤 //////////////////////////////// -func TestMessageSubAndPub(t *testing.T) { - exit := make(chan bool, 1) - //创建一个队列 - msgQueue := Queue{Topics: map[string]Topic{}} +func TestMessageSubAndPubWithTopic(t *testing.T) { + var wg sync.WaitGroup - //创建一个感兴趣的话题 - topic := msgQueue.AddTopic("i want apple", 10) + topicName := "seeking passengers" + //假设评估 + topic := Topic{ + Name: topicName, + UserQueueSize: 5, + } - //向队列订阅话题 - if subSCription123, ok := topic.Subscribe(123, "tom want apple"); ok { + ctx, cancel := context.WithCancel(context.Background()) - //订阅成功了 + wg.Add(1) + //user 1 + + //用户tom订阅拼车消息,订阅的是车主发布的拼车消息 + if subScriberTom, ok := topic.Subscribe(123, topicName); ok { go func() { + defer wg.Done() EXIT: for { select { - case <-exit: + case <-ctx.Done(): + fmt.Println("tom receive cancel, exit") break EXIT default: msg := Message{} - subSCription123.Receive(&msg) - fmt.Println(msg) + err := subScriberTom.Receive(&msg) + if err == nil { + fmt.Println("tom receive subscribed msg:", msg) + } } time.Sleep(200) } }() + } + wg.Add(1) + //订阅成功了 + //发送一个消息 + + //用户Lily订阅拼车消息,订阅的是车主发布的拼车消息 + if subSCriptionLily, ok := topic.Subscribe(456, topicName); ok { + go func() { + defer wg.Done() + EXIT: + for { + select { + case <-ctx.Done(): + fmt.Println("lily receive cancel, exit") + break EXIT + default: + msg := Message{} + err := subSCriptionLily.Receive(&msg) + if err == nil { + fmt.Println("lily receive subscribed msg:", msg) + } + } + time.Sleep(200) + } + }() + } + + go func() { + //模拟发送消息 msg := Message{ - Text: "here is a apple", + Text: "i am looking for 1 passenger", From: Session{User{123, "lily"}, time.Now()}, } - subSCription123.Publish(msg) - msg.Seq++ - subSCription123.Publish(msg) - } + topic.Publish(msg) - if subSCription456, ok := topic.Subscribe(456, "lily want peach"); ok { - - //订阅成功了 - //发送一个消息 - go func() { - EXIT: - for { - select { - case <-exit: - break EXIT - default: - msg := Message{} - subSCription456.Receive(&msg) - fmt.Println(msg) - } - time.Sleep(200) - } - }() - - msg := Message{ - Text: "here is a peach", - From: Session{User{123, "bob"}, time.Now()}, + msg = Message{ + Text: "i am looking for 2 passenger", + From: Session{User{123, "lucy"}, time.Now()}, } - subSCription456.Publish(msg) + topic.Publish(msg) - msg.Seq++ + msg = Message{ + Text: "i am looking for passenger as many as i can", + From: Session{User{123, "rose"}, time.Now()}, + } - subSCription456.Publish(msg) + topic.Publish(msg) + time.Sleep(time.Second) + cancel() - } + }() + + wg.Wait() + fmt.Println("all message done,exit it") }