update message pattern

This commit is contained in:
Edward 2020-04-29 14:09:22 +08:00
parent 43a7c53f97
commit 7619991e0f

View File

@ -7,8 +7,10 @@ import (
//Message for msg in Message bus //Message for msg in Message bus
type Message struct { type Message struct {
Alarm int //Type 类型[code :1,2,3,4]
priority int Type int
Text string
From Session //消息来源
} }
//User for user //User for user
@ -24,9 +26,11 @@ type Session struct {
} }
//Subscription for user //Subscription for user
//Subscription is a session
type Subscription struct { type Subscription struct {
ch chan Message Session
Inbox chan Message ch chan<- Message //发送队列
Inbox <-chan Message //接收消息的队列
} }
//Publish a message to subscription queue //Publish a message to subscription queue
@ -41,16 +45,17 @@ func (s *Subscription) Publish(msg Message) error {
} }
//Topic that user is interested in //Topic that user is interested in
//Topic locate in MQ
type Topic struct { type Topic struct {
uid uint64
Name string Name string
Subscribers []Session //user list Subscribers map[uint64]Session //user list
MessageHistory []Message //当前主题的消息历史,实际项目中需要限定大小并设置过期时间 MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间
subscription Subscription
} }
//MesssageQueue of manager all topics //Queue hold all topics
type MesssageQueue struct { type Queue struct {
Topics map[uint64]*Topic Topics map[string]*Topic //topic ID<-----> topic Object
} }
//String remove Subscription //String remove Subscription
@ -58,25 +63,56 @@ func (t *Topic) String() string {
return t.Name return t.Name
} }
//Subscribe a topic func (t *Topic) findSession(uid uint64) (Session, bool) {
func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
// Get session or create one if it's the first // Get session or create one if it's the first
var Session session
if t.Subscribers == nil || len(t.Subscribers) == 0 {
return Session{}, false
}
if session, found := t.Subscribers[uid]; found {
return session, true
}
return Session{}, false
}
// Add session to the Topic & MessageHistory func (t *Topic) addSession(uid uint64) Session {
// Create a subscription var Session session
// Get session or create one if it's the first
if session, found := t.findSession(uid); !found {
if t.Subscribers == ni {
t.Subscribers = make(map[uint64]Session)
}
session = Session{User{uid, "no name"}, time.Now()}
t.Subscribers[uid] = session
}
return session
}
return Subscription{}, nil //Subscribe a spec topic
func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
session := t.addSession(uid)
// Create a subscription from copy
subscription := Subscription{session, t.subscription.ch, t.subscription.Inbox}
return subscription, nil
} }
//Unsubscribe remove Subscription //Unsubscribe remove Subscription
func (t *Topic) Unsubscribe(Subscription) error { func (t *Topic) Unsubscribe(s Subscription) error {
if t.findSession(s.User.ID) {
delete(t.Subscribers, s.User.ID)
}
return nil return nil
} }
//Delete topic //Delete topic
func (t *Topic) Delete() error { func (t *Topic) Delete() error {
t.Subscribers = nil
t.Name = ""
t.MessageHistory = nil
t.subscription = Subscription{}
return nil return nil
} }