From e7a0048a763909315baebfdcc7dd3553e816732d Mon Sep 17 00:00:00 2001 From: Edward Date: Wed, 29 Apr 2020 17:27:46 +0800 Subject: [PATCH] add codes contents for message pub/sub --- gomore/27_messages/message.go | 126 +++++++++++++++++++---------- gomore/27_messages/message_test.go | 85 +++++++++++++++++-- 2 files changed, 163 insertions(+), 48 deletions(-) diff --git a/gomore/27_messages/message.go b/gomore/27_messages/message.go index 1d5d374..edecbec 100644 --- a/gomore/27_messages/message.go +++ b/gomore/27_messages/message.go @@ -1,6 +1,7 @@ package gomore import ( + "context" "errors" "time" ) @@ -19,9 +20,9 @@ type User struct { Name string } -//Session of user +//Session inherit user type Session struct { - User User + User Timestamp time.Time } @@ -29,33 +30,80 @@ type Session struct { //Subscription is a session type Subscription struct { Session - ch chan<- Message //发送队列 - Inbox <-chan Message //接收消息的队列 + topicName string + ctx context.Context + cancel context.CancelFunc + ch chan<- Message //发送队列 + Inbox <-chan Message //接收消息的队列 } +func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription { + + ctx, cancel := context.WithCancel(context.Background()) + + return Subscription{ + Session: Session{User{ID: uid}, time.Now()}, + ctx: ctx, + cancel: cancel, + ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息 + Inbox: make(<-chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息 + } + +} + +//Cancel Message +func (s *Subscription) Cancel() { + s.cancel() +} + +//Publish 只有当channel无数据,且channel被close了,才会返回ok=false //Publish a message to subscription queue func (s *Subscription) Publish(msg Message) error { - if _, ok := <-s.ch; !ok { - return errors.New("Topic has been closed") - } - //用go channel 作为队列,接收消息 - s.ch <- msg + select { + case <-s.ctx.Done(): + return errors.New("Topic has been closed") + default: + s.ch <- msg + } + return nil +} + +//Receive message +func (s *Subscription) Receive(out *Message) error { + + select { + case <-s.ctx.Done(): + return errors.New("Topic has been closed") + default: + *out <- s.Inbox + } return nil } //Topic that user is interested in //Topic locate in MQ type Topic struct { + UserQueueSize int Name string - Subscribers map[uint64]Session //user list - MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间 - subscription Subscription + Subscribers map[uint64]Subscription //user list + MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间 } //Queue hold all topics type Queue struct { - Topics map[string]*Topic //topic ID<-----> topic Object + Topics map[string]Topic //topic ID<-----> topic Object +} + +//AddTopic to Queue +func (q *Queue) AddTopic(topicName string, topicUserQueueSize int) Topic { + if q.Topics == nil { + q.Topics = make(map[string]Topic) + } + if _, found := q.Topics[topicName]; !found { + q.Topics[topicName] = Topic{UserQueueSize: topicUserQueueSize, Name: topicName} + } + return q.Topics[topicName] } //String remove Subscription @@ -63,47 +111,42 @@ func (t *Topic) String() string { return t.Name } -func (t *Topic) findSession(uid uint64) (Session, bool) { +func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) { // Get session or create one if it's the first - var Session session - if t.Subscribers == nil || len(t.Subscribers) == 0 { - return Session{}, false - } - if session, found := t.Subscribers[uid]; found { - return session, true - } - return Session{}, false -} -func (t *Topic) addSession(uid uint64) Session { - - var Session session - // Get session or create one if it's the first - if session, found := t.findSession(uid); !found { - if t.Subscribers == ni { - t.Subscribers = make(map[uint64]Session) - } - session = Session{User{uid, "no name"}, time.Now()} - t.Subscribers[uid] = session + if topicName != t.Name || t.Subscribers == nil || len(t.Subscribers) == 0 { + return Subscription{}, false } - return session + if subscription, found := t.Subscribers[uid]; found { + return subscription, true + } + return Subscription{}, false } //Subscribe a spec topic -func (t *Topic) Subscribe(uid uint64) (Subscription, error) { +func (t *Topic) Subscribe(uid uint64, topicName string) (Subscription, bool) { - session := t.addSession(uid) + if t.Name != topicName { + return Subscription{}, false + } - // Create a subscription from copy - subscription := Subscription{session, t.subscription.ch, t.subscription.Inbox} + var subscription Subscription + // Get session or create one if it's the first + if _, found := t.findUserSubscription(uid, topicName); !found { + if t.Subscribers == nil { + t.Subscribers = make(map[uint64]Subscription) + } + subscription = newSubscription(uid, topicName, t.UserQueueSize) + t.Subscribers[uid] = subscription + } - return subscription, nil + return subscription, true } //Unsubscribe remove Subscription func (t *Topic) Unsubscribe(s Subscription) error { - if t.findSession(s.User.ID) { - delete(t.Subscribers, s.User.ID) + if _, found := t.findUserSubscription(s.ID, s.topicName); found { + delete(t.Subscribers, s.ID) } return nil } @@ -113,6 +156,5 @@ func (t *Topic) Delete() error { t.Subscribers = nil t.Name = "" t.MessageHistory = nil - t.subscription = Subscription{} return nil } diff --git a/gomore/27_messages/message_test.go b/gomore/27_messages/message_test.go index 6f1b26f..cd2887f 100644 --- a/gomore/27_messages/message_test.go +++ b/gomore/27_messages/message_test.go @@ -1,15 +1,88 @@ package gomore -import "testing" +import ( + "fmt" + "testing" + "time" +) +//////////////////////////////// +//通常意义上是,连接消息队列之后就可以发送消息 +////当订阅著之后才会收到相关Topic消息的推送 +//这里为了简化,直接订阅成功后发送消息,省去连接消息队列发送消息的步骤 +//////////////////////////////// func TestMessageSubAndPub(t *testing.T) { + exit := make(chan bool, 1) //创建一个队列 - msgQueue := MesssageQueue{Topics: map[uint64]*Topic{}} + msgQueue := Queue{Topics: map[string]Topic{}} - //创建一个话题 - topic := Topic{} + //创建一个感兴趣的话题 + topic := msgQueue.AddTopic("i want apple", 10) + + //向队列订阅话题 + if subSCription123, ok := topic.Subscribe(123, "tom want apple"); ok { + + //订阅成功了 + + go func() { + EXIT + for { + select { + case <-exit: + break EXIT + default: + msg := Message{} + subSCription123.Receive(&msg) + fmt.Println(msg) + } + time.Sleep(200) + } + }() + + msg := Message{ + //Type 类型[code :1,2,3,4] + Type: 1, + Text: "here is a apple", + From: Session{User{123, "lily"}, time.Now()}, + } + subSCription123.Publish(msg) + msg.Type++ + subSCription123.Publish(msg) + } + + if subSCription456, ok := topic.Subscribe(456, "lily want peach"); ok { + + //订阅成功了 + //发送一个消息 + go func() { + EXIT + for { + select { + case <-exit: + break EXIT + default: + msg := Message{} + subSCription456.Receive(&msg) + fmt.Println(msg) + } + time.Sleep(200) + } + }() + + msg := Message{ + //Type 类型[code :1,2,3,4] + Type: 1, + Text: "here is a peach", + From: Session{User{123, "bob"}, time.Now()}, + } + + subSCription456.Publish(msg) + + msg.Type++ + + subSCription456.Publish(msg) + + } - //像队列订阅话题 - topic.Subscribe(123) }