add codes contents for message pub/sub

This commit is contained in:
Edward 2020-04-29 17:27:46 +08:00
parent 92f9e30211
commit e7a0048a76
2 changed files with 163 additions and 48 deletions

View File

@ -1,6 +1,7 @@
package gomore
import (
"context"
"errors"
"time"
)
@ -19,9 +20,9 @@ type User struct {
Name string
}
//Session of user
//Session inherit user
type Session struct {
User User
User
Timestamp time.Time
}
@ -29,33 +30,80 @@ type Session struct {
//Subscription is a session
type Subscription struct {
Session
ch chan<- Message //发送队列
Inbox <-chan Message //接收消息的队列
topicName string
ctx context.Context
cancel context.CancelFunc
ch chan<- Message //发送队列
Inbox <-chan Message //接收消息的队列
}
func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription {
ctx, cancel := context.WithCancel(context.Background())
return Subscription{
Session: Session{User{ID: uid}, time.Now()},
ctx: ctx,
cancel: cancel,
ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息
Inbox: make(<-chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息
}
}
//Cancel Message
func (s *Subscription) Cancel() {
s.cancel()
}
//Publish 只有当channel无数据且channel被close了才会返回ok=false
//Publish a message to subscription queue
func (s *Subscription) Publish(msg Message) error {
if _, ok := <-s.ch; !ok {
return errors.New("Topic has been closed")
}
//用go channel 作为队列,接收消息
s.ch <- msg
select {
case <-s.ctx.Done():
return errors.New("Topic has been closed")
default:
s.ch <- msg
}
return nil
}
//Receive message
func (s *Subscription) Receive(out *Message) error {
select {
case <-s.ctx.Done():
return errors.New("Topic has been closed")
default:
*out <- s.Inbox
}
return nil
}
//Topic that user is interested in
//Topic locate in MQ
type Topic struct {
UserQueueSize int
Name string
Subscribers map[uint64]Session //user list
MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间
subscription Subscription
Subscribers map[uint64]Subscription //user list
MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间
}
//Queue hold all topics
type Queue struct {
Topics map[string]*Topic //topic ID<-----> topic Object
Topics map[string]Topic //topic ID<-----> topic Object
}
//AddTopic to Queue
func (q *Queue) AddTopic(topicName string, topicUserQueueSize int) Topic {
if q.Topics == nil {
q.Topics = make(map[string]Topic)
}
if _, found := q.Topics[topicName]; !found {
q.Topics[topicName] = Topic{UserQueueSize: topicUserQueueSize, Name: topicName}
}
return q.Topics[topicName]
}
//String remove Subscription
@ -63,47 +111,42 @@ func (t *Topic) String() string {
return t.Name
}
func (t *Topic) findSession(uid uint64) (Session, bool) {
func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) {
// Get session or create one if it's the first
var Session session
if t.Subscribers == nil || len(t.Subscribers) == 0 {
return Session{}, false
}
if session, found := t.Subscribers[uid]; found {
return session, true
}
return Session{}, false
}
func (t *Topic) addSession(uid uint64) Session {
var Session session
// Get session or create one if it's the first
if session, found := t.findSession(uid); !found {
if t.Subscribers == ni {
t.Subscribers = make(map[uint64]Session)
}
session = Session{User{uid, "no name"}, time.Now()}
t.Subscribers[uid] = session
if topicName != t.Name || t.Subscribers == nil || len(t.Subscribers) == 0 {
return Subscription{}, false
}
return session
if subscription, found := t.Subscribers[uid]; found {
return subscription, true
}
return Subscription{}, false
}
//Subscribe a spec topic
func (t *Topic) Subscribe(uid uint64) (Subscription, error) {
func (t *Topic) Subscribe(uid uint64, topicName string) (Subscription, bool) {
session := t.addSession(uid)
if t.Name != topicName {
return Subscription{}, false
}
// Create a subscription from copy
subscription := Subscription{session, t.subscription.ch, t.subscription.Inbox}
var subscription Subscription
// Get session or create one if it's the first
if _, found := t.findUserSubscription(uid, topicName); !found {
if t.Subscribers == nil {
t.Subscribers = make(map[uint64]Subscription)
}
subscription = newSubscription(uid, topicName, t.UserQueueSize)
t.Subscribers[uid] = subscription
}
return subscription, nil
return subscription, true
}
//Unsubscribe remove Subscription
func (t *Topic) Unsubscribe(s Subscription) error {
if t.findSession(s.User.ID) {
delete(t.Subscribers, s.User.ID)
if _, found := t.findUserSubscription(s.ID, s.topicName); found {
delete(t.Subscribers, s.ID)
}
return nil
}
@ -113,6 +156,5 @@ func (t *Topic) Delete() error {
t.Subscribers = nil
t.Name = ""
t.MessageHistory = nil
t.subscription = Subscription{}
return nil
}

View File

@ -1,15 +1,88 @@
package gomore
import "testing"
import (
"fmt"
"testing"
"time"
)
////////////////////////////////
//通常意义上是,连接消息队列之后就可以发送消息
////当订阅著之后才会收到相关Topic消息的推送
//这里为了简化,直接订阅成功后发送消息,省去连接消息队列发送消息的步骤
////////////////////////////////
func TestMessageSubAndPub(t *testing.T) {
exit := make(chan bool, 1)
//创建一个队列
msgQueue := MesssageQueue{Topics: map[uint64]*Topic{}}
msgQueue := Queue{Topics: map[string]Topic{}}
//创建一个话题
topic := Topic{}
//创建一个感兴趣的话题
topic := msgQueue.AddTopic("i want apple", 10)
//向队列订阅话题
if subSCription123, ok := topic.Subscribe(123, "tom want apple"); ok {
//订阅成功了
go func() {
EXIT
for {
select {
case <-exit:
break EXIT
default:
msg := Message{}
subSCription123.Receive(&msg)
fmt.Println(msg)
}
time.Sleep(200)
}
}()
msg := Message{
//Type 类型[code :1,2,3,4]
Type: 1,
Text: "here is a apple",
From: Session{User{123, "lily"}, time.Now()},
}
subSCription123.Publish(msg)
msg.Type++
subSCription123.Publish(msg)
}
if subSCription456, ok := topic.Subscribe(456, "lily want peach"); ok {
//订阅成功了
//发送一个消息
go func() {
EXIT
for {
select {
case <-exit:
break EXIT
default:
msg := Message{}
subSCription456.Receive(&msg)
fmt.Println(msg)
}
time.Sleep(200)
}
}()
msg := Message{
//Type 类型[code :1,2,3,4]
Type: 1,
Text: "here is a peach",
From: Session{User{123, "bob"}, time.Now()},
}
subSCription456.Publish(msg)
msg.Type++
subSCription456.Publish(msg)
}
//像队列订阅话题
topic.Subscribe(123)
}