From 7619991e0f936c5790a9aa77dea00c7d53622a84 Mon Sep 17 00:00:00 2001 From: Edward Date: Wed, 29 Apr 2020 14:09:22 +0800 Subject: [PATCH] update message pattern --- gomore/27_messages/message.go | 72 ++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/gomore/27_messages/message.go b/gomore/27_messages/message.go index df860f0..1d5d374 100644 --- a/gomore/27_messages/message.go +++ b/gomore/27_messages/message.go @@ -7,8 +7,10 @@ import ( //Message for msg in Message bus type Message struct { - Alarm int - priority int + //Type 类型[code :1,2,3,4] + Type int + Text string + From Session //消息来源 } //User for user @@ -24,9 +26,11 @@ type Session struct { } //Subscription for user +//Subscription is a session type Subscription struct { - ch chan Message - Inbox chan Message + Session + ch chan<- Message //发送队列 + Inbox <-chan Message //接收消息的队列 } //Publish a message to subscription queue @@ -41,16 +45,17 @@ func (s *Subscription) Publish(msg Message) error { } //Topic that user is interested in +//Topic locate in MQ type Topic struct { - uid uint64 Name string - Subscribers []Session //user list - MessageHistory []Message //当前主题的消息历史,实际项目中需要限定大小并设置过期时间 + Subscribers map[uint64]Session //user list + MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间 + subscription Subscription } -//MesssageQueue of manager all topics -type MesssageQueue struct { - Topics map[uint64]*Topic +//Queue hold all topics +type Queue struct { + Topics map[string]*Topic //topic ID<-----> topic Object } //String remove Subscription @@ -58,25 +63,56 @@ func (t *Topic) String() string { return t.Name } -//Subscribe a topic -func (t *Topic) Subscribe(uid uint64) (Subscription, error) { +func (t *Topic) findSession(uid uint64) (Session, bool) { // Get session or create one if it's the first + var Session session + if t.Subscribers == nil || len(t.Subscribers) == 0 { + return Session{}, false + } + if session, found := t.Subscribers[uid]; found { + return session, true + } + return Session{}, false +} - // Add session to the Topic & MessageHistory +func (t *Topic) addSession(uid uint64) Session { - // Create a subscription + var Session session + // Get session or create one if it's the first + if session, found := t.findSession(uid); !found { + if t.Subscribers == ni { + t.Subscribers = make(map[uint64]Session) + } + session = Session{User{uid, "no name"}, time.Now()} + t.Subscribers[uid] = session + } + return session +} - return Subscription{}, nil +//Subscribe a spec topic +func (t *Topic) Subscribe(uid uint64) (Subscription, error) { + + session := t.addSession(uid) + + // Create a subscription from copy + subscription := Subscription{session, t.subscription.ch, t.subscription.Inbox} + + return subscription, nil } //Unsubscribe remove Subscription -func (t *Topic) Unsubscribe(Subscription) error { - +func (t *Topic) Unsubscribe(s Subscription) error { + if t.findSession(s.User.ID) { + delete(t.Subscribers, s.User.ID) + } return nil } //Delete topic func (t *Topic) Delete() error { - + t.Subscribers = nil + t.Name = "" + t.MessageHistory = nil + t.subscription = Subscription{} return nil }