finish pub/sub pattern

This commit is contained in:
Edward 2020-04-30 14:50:21 +08:00
parent aae7ee1f5b
commit 2178e802c2
4 changed files with 123 additions and 75 deletions

View File

@ -10,10 +10,12 @@
图片来源:[pubsub-pattern.md](https://github.com/imsardine/dev-notes/blob/source/docs/pubsub-pattern.md) 图片来源:[pubsub-pattern.md](https://github.com/imsardine/dev-notes/blob/source/docs/pubsub-pattern.md)
示例演示消息队的某个主题(Topic)收到用户订阅之后,的处理过程和与用户之间的响应互动。 现实生活中的各种信息平台,就是很好的发布订阅的例子,比如某八戒、某无忧等
这里演示,一个拼车例子,车主发布拼车(Topic)消息,消息推送到订阅拼车(Topic)信息的所有用户.
并模拟以下情形: 并模拟以下情形:
+ 主题向用户发送消息 + 车主(Topic)发布拼车消息
+ 用户订阅主题 + 拼车用户订阅拼车消息(Topic)
+ 用户发送某个主题消息 + 拼车用户处理收到的拼车消息

View File

@ -1,8 +1,9 @@
package gomore package messaging
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"time" "time"
) )
@ -32,8 +33,8 @@ type Subscription struct {
topicName string topicName string
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
ch chan<- Message //发送队列 Ch chan<- Message //发送队列
Inbox <-chan Message //接收消息的队列 Inbox chan Message //接收消息的队列
} }
func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription { func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscription {
@ -44,8 +45,8 @@ func newSubscription(uid uint64, topicName string, UserQueueSize int) Subscripti
Session: Session{User{ID: uid}, time.Now()}, Session: Session{User{ID: uid}, time.Now()},
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息 Ch: make(chan<- Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户发送消息
Inbox: make(<-chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息 Inbox: make(chan Message, UserQueueSize), //用于跟单个用户通信的消息队列,用户接收消息
} }
} }
@ -55,6 +56,8 @@ func (s *Subscription) Cancel() {
s.cancel() s.cancel()
} }
//Publish 这个表示用户订阅到感兴趣的主题的时候,同时也可以发送消息,
//Publish 但是,示例中不演示这个用途
//Publish 只有当channel无数据且channel被close了才会返回ok=false //Publish 只有当channel无数据且channel被close了才会返回ok=false
//Publish a message to subscription queue //Publish a message to subscription queue
func (s *Subscription) Publish(msg Message) error { func (s *Subscription) Publish(msg Message) error {
@ -63,7 +66,7 @@ func (s *Subscription) Publish(msg Message) error {
case <-s.ctx.Done(): case <-s.ctx.Done():
return errors.New("Topic has been closed") return errors.New("Topic has been closed")
default: default:
s.ch <- msg s.Ch <- msg
} }
return nil return nil
} }
@ -74,14 +77,15 @@ func (s *Subscription) Receive(out *Message) error {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return errors.New("Topic has been closed") return errors.New("Topic has been closed")
default: case <-time.After(time.Millisecond * 100):
*out = <-s.Inbox return errors.New("time out error")
case *out = <-s.Inbox:
return nil
} }
return nil
} }
//Topic that user is interested in //Topic that user is interested in
//Topic locate in MQ //Topic should locate in MQ
type Topic struct { type Topic struct {
UserQueueSize int UserQueueSize int
Name string Name string
@ -89,20 +93,22 @@ type Topic struct {
MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间 MessageHistory []Message //当前主题的消息历史,实际项目中可能需要限定大小并设置过期时间
} }
//Queue hold all topics //Publish 只有当channel无数据且channel被close了才会返回ok=false
type Queue struct { //Publish a message to subscription queue
Topics map[string]Topic //topic ID<-----> topic Object func (t *Topic) Publish(msg Message) error {
}
//AddTopic to Queue //将消息发布给当前Topic的所有人
func (q *Queue) AddTopic(topicName string, topicUserQueueSize int) Topic { for usersID, subscription := range t.Subscribers {
if q.Topics == nil { if subscription.ID == usersID {
q.Topics = make(map[string]Topic) subscription.Inbox <- msg
}
} }
if _, found := q.Topics[topicName]; !found { //save message history
q.Topics[topicName] = Topic{UserQueueSize: topicUserQueueSize, Name: topicName} t.MessageHistory = append(t.MessageHistory, msg)
}
return q.Topics[topicName] fmt.Println("current histroy message count: ", len(t.MessageHistory))
return nil
} }
func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) { func (t *Topic) findUserSubscription(uid uint64, topicName string) (Subscription, bool) {
@ -124,17 +130,15 @@ func (t *Topic) Subscribe(uid uint64, topicName string) (Subscription, bool) {
return Subscription{}, false return Subscription{}, false
} }
var subscription Subscription
// Get session or create one if it's the first // Get session or create one if it's the first
if _, found := t.findUserSubscription(uid, topicName); !found { if _, found := t.findUserSubscription(uid, topicName); !found {
if t.Subscribers == nil { if t.Subscribers == nil {
t.Subscribers = make(map[uint64]Subscription) t.Subscribers = make(map[uint64]Subscription)
} }
subscription = newSubscription(uid, topicName, t.UserQueueSize) t.Subscribers[uid] = newSubscription(uid, topicName, t.UserQueueSize)
t.Subscribers[uid] = subscription
} }
return subscription, true return t.Subscribers[uid], true
} }
//Unsubscribe remove Subscription //Unsubscribe remove Subscription

View File

@ -0,0 +1,17 @@
package messaging
//Queue hold all topics
type Queue struct {
Topics map[string]Topic //topic ID<-----> topic Object
}
//AddTopic to Queue
func (q *Queue) AddTopic(topicName string, topicUserQueueSize int) Topic {
if q.Topics == nil {
q.Topics = make(map[string]Topic)
}
if _, found := q.Topics[topicName]; !found {
q.Topics[topicName] = Topic{UserQueueSize: topicUserQueueSize, Name: topicName}
}
return q.Topics[topicName]
}

View File

@ -1,7 +1,9 @@
package gomore package messaging
import ( import (
"context"
"fmt" "fmt"
"sync"
"testing" "testing"
"time" "time"
) )
@ -9,76 +11,99 @@ import (
//////////////////////////////// ////////////////////////////////
//通常意义上是,连接消息队列之后就可以发送消息 //通常意义上是,连接消息队列之后就可以发送消息
//当订阅著之后才会收到相关Topic消息的推送 //当订阅著之后才会收到相关Topic消息的推送
//这里为了简化,直接订阅成功后发送消息,省去连接消息队列发送消息的步骤
//////////////////////////////// ////////////////////////////////
func TestMessageSubAndPub(t *testing.T) {
exit := make(chan bool, 1) func TestMessageSubAndPubWithTopic(t *testing.T) {
//创建一个队列 var wg sync.WaitGroup
msgQueue := Queue{Topics: map[string]Topic{}}
//创建一个感兴趣的话题 topicName := "seeking passengers"
topic := msgQueue.AddTopic("i want apple", 10) //假设评估
topic := Topic{
Name: topicName,
UserQueueSize: 5,
}
//向队列订阅话题 ctx, cancel := context.WithCancel(context.Background())
if subSCription123, ok := topic.Subscribe(123, "tom want apple"); ok {
//订阅成功了 wg.Add(1)
//user 1
//用户tom订阅拼车消息,订阅的是车主发布的拼车消息
if subScriberTom, ok := topic.Subscribe(123, topicName); ok {
go func() { go func() {
defer wg.Done()
EXIT: EXIT:
for { for {
select { select {
case <-exit: case <-ctx.Done():
fmt.Println("tom receive cancel, exit")
break EXIT break EXIT
default: default:
msg := Message{} msg := Message{}
subSCription123.Receive(&msg) err := subScriberTom.Receive(&msg)
fmt.Println(msg) if err == nil {
fmt.Println("tom receive subscribed msg:", msg)
}
} }
time.Sleep(200) time.Sleep(200)
} }
}() }()
}
wg.Add(1)
//订阅成功了
//发送一个消息
//用户Lily订阅拼车消息,订阅的是车主发布的拼车消息
if subSCriptionLily, ok := topic.Subscribe(456, topicName); ok {
go func() {
defer wg.Done()
EXIT:
for {
select {
case <-ctx.Done():
fmt.Println("lily receive cancel, exit")
break EXIT
default:
msg := Message{}
err := subSCriptionLily.Receive(&msg)
if err == nil {
fmt.Println("lily receive subscribed msg:", msg)
}
}
time.Sleep(200)
}
}()
}
go func() {
//模拟发送消息
msg := Message{ msg := Message{
Text: "here is a apple", Text: "i am looking for 1 passenger",
From: Session{User{123, "lily"}, time.Now()}, From: Session{User{123, "lily"}, time.Now()},
} }
subSCription123.Publish(msg) topic.Publish(msg)
msg.Seq++
subSCription123.Publish(msg)
}
if subSCription456, ok := topic.Subscribe(456, "lily want peach"); ok { msg = Message{
Text: "i am looking for 2 passenger",
//订阅成功了 From: Session{User{123, "lucy"}, time.Now()},
//发送一个消息
go func() {
EXIT:
for {
select {
case <-exit:
break EXIT
default:
msg := Message{}
subSCription456.Receive(&msg)
fmt.Println(msg)
}
time.Sleep(200)
}
}()
msg := Message{
Text: "here is a peach",
From: Session{User{123, "bob"}, time.Now()},
} }
subSCription456.Publish(msg) topic.Publish(msg)
msg.Seq++ msg = Message{
Text: "i am looking for passenger as many as i can",
From: Session{User{123, "rose"}, time.Now()},
}
subSCription456.Publish(msg) topic.Publish(msg)
time.Sleep(time.Second)
cancel()
} }()
wg.Wait()
fmt.Println("all message done,exit it")
} }