go-pattern-examples/gomore/01_messages/message.go

159 lines
3.7 KiB
Go
Raw Permalink Normal View History

2020-04-30 09:50:21 +03:00
package messaging
2020-04-28 07:31:29 +03:00
import (
2020-04-29 12:27:46 +03:00
"context"
2020-04-28 07:31:29 +03:00
"errors"
2020-04-30 09:50:21 +03:00
"fmt"
2020-04-28 07:31:29 +03:00
"time"
)
2020-04-28 12:24:15 +03:00
//Message for msg in Message bus
2020-04-28 07:31:29 +03:00
type Message struct {
2020-04-29 17:44:12 +03:00
Seq int
2020-04-29 09:09:22 +03:00
Text string
From Session //消息来源
2020-04-28 07:31:29 +03:00
}
//User for user
type User struct {
ID uint64
Name string
}
2020-04-29 12:27:46 +03:00
//Session inherit user
2020-04-28 07:31:29 +03:00
type Session struct {
2020-04-29 12:27:46 +03:00
User
2020-04-28 07:31:29 +03:00
Timestamp time.Time
}
//Subscription for user
2020-04-29 09:09:22 +03:00
//Subscription is a session
2020-04-28 07:31:29 +03:00
type Subscription struct {
2020-04-29 09:09:22 +03:00
Session
2020-04-29 12:27:46 +03:00
topicName string
ctx context.Context
cancel context.CancelFunc
2020-04-30 09:50:21 +03:00
Ch chan<- Message //发送队列
Inbox chan Message //接收消息的队列
2020-04-28 07:31:29 +03:00
}
2020-04-29 12:27:46 +03:00
func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription {
ctx, cancel := context.WithCancel(context.Background())
return Subscription{
Session: Session{User{ID: uid}, time.Now()},
ctx: ctx,
cancel: cancel,
2020-04-30 09:50:21 +03:00
Ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息
Inbox: make(chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息
2020-04-29 12:27:46 +03:00
}
}
//Cancel Message
func (s *Subscription) Cancel() {
s.cancel()
}
2020-04-30 09:50:21 +03:00
//Publish 这个表示用户订阅到感兴趣的主题的时候,同时也可以发送消息,
//Publish 但是,示例中不演示这个用途
2020-04-29 12:27:46 +03:00
//Publish 只有当channel无数据且channel被close了才会返回ok=false
2020-04-28 07:31:29 +03:00
//Publish a message to subscription queue
func (s *Subscription) Publish(msg Message) error {
2020-04-29 12:27:46 +03:00
select {
case <-s.ctx.Done():
2020-04-28 07:31:29 +03:00
return errors.New("Topic has been closed")
2020-04-29 12:27:46 +03:00
default:
2020-04-30 09:50:21 +03:00
s.Ch <- msg
2020-04-28 07:31:29 +03:00
}
2020-04-29 12:27:46 +03:00
return nil
}
2020-04-28 07:31:29 +03:00
2020-04-29 12:27:46 +03:00
//Receive message
func (s *Subscription) Receive(out *Message) error {
select {
case <-s.ctx.Done():
return errors.New("Topic has been closed")
2020-04-30 09:50:21 +03:00
case <-time.After(time.Millisecond * 100):
return errors.New("time out error")
case *out = <-s.Inbox:
return nil
2020-04-29 12:27:46 +03:00
}
2020-04-28 07:31:29 +03:00
}
//Topic that user is interested in
2020-04-30 09:50:21 +03:00
//Topic should locate in MQ
2020-04-28 07:31:29 +03:00
type Topic struct {
2020-04-29 12:27:46 +03:00
UserQueueSize int
2020-04-28 12:24:15 +03:00
Name string
2020-04-29 12:27:46 +03:00
Subscribers map[uint64]Subscription //user list
MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间
2020-04-28 12:24:15 +03:00
}
2020-04-30 09:50:21 +03:00
//Publish 只有当channel无数据且channel被close了才会返回ok=false
//Publish a message to subscription queue
func (t *Topic) Publish(msg Message) error {
2020-04-29 12:27:46 +03:00
2020-04-30 09:50:21 +03:00
//将消息发布给当前Topic的所有人
for usersID, subscription := range t.Subscribers {
if subscription.ID == usersID {
subscription.Inbox <- msg
}
2020-04-29 12:27:46 +03:00
}
2020-04-30 09:50:21 +03:00
//save message history
t.MessageHistory = append(t.MessageHistory, msg)
fmt.Println("current histroy message count: ", len(t.MessageHistory))
return nil
2020-04-28 17:48:25 +03:00
}
2020-04-29 12:27:46 +03:00
func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) {
2020-04-29 09:09:22 +03:00
// Get session or create one if it's the first
2020-04-29 12:27:46 +03:00
if topicName != t.Name || t.Subscribers == nil || len(t.Subscribers) == 0 {
return Subscription{}, false
2020-04-29 09:09:22 +03:00
}
2020-04-29 12:27:46 +03:00
if subscription, found := t.Subscribers[uid]; found {
return subscription, true
2020-04-29 09:09:22 +03:00
}
2020-04-29 12:27:46 +03:00
return Subscription{}, false
2020-04-29 09:09:22 +03:00
}
2020-04-29 12:27:46 +03:00
//Subscribe a spec topic
func (t *Topic) Subscribe(uid uint64, topicName string) (Subscription, bool) {
2020-04-29 09:09:22 +03:00
2020-04-29 12:27:46 +03:00
if t.Name != topicName {
return Subscription{}, false
}
2020-04-28 12:24:15 +03:00
// Get session or create one if it's the first
2020-04-29 12:27:46 +03:00
if _, found := t.findUserSubscription(uid, topicName); !found {
if t.Subscribers == nil {
t.Subscribers = make(map[uint64]Subscription)
2020-04-29 09:09:22 +03:00
}
2020-04-30 09:50:21 +03:00
t.Subscribers[uid] = newSubscription(uid, topicName, t.UserQueueSize)
2020-04-29 09:09:22 +03:00
}
2020-04-28 07:31:29 +03:00
2020-04-30 09:50:21 +03:00
return t.Subscribers[uid], true
2020-04-28 07:31:29 +03:00
}
//Unsubscribe remove Subscription
2020-04-29 09:09:22 +03:00
func (t *Topic) Unsubscribe(s Subscription) error {
2020-04-29 12:27:46 +03:00
if _, found := t.findUserSubscription(s.ID, s.topicName); found {
delete(t.Subscribers, s.ID)
2020-04-29 09:09:22 +03:00
}
2020-04-28 07:31:29 +03:00
return nil
}
2020-04-28 12:24:15 +03:00
//Delete topic
2020-04-28 07:31:29 +03:00
func (t *Topic) Delete() error {
2020-04-29 09:09:22 +03:00
t.Subscribers = nil
t.Name = ""
t.MessageHistory = nil
2020-04-28 07:31:29 +03:00
return nil
}