diff --git a/gomore/01_messages/README.md b/gomore/01_messages/README.md index 85e605a..cd4b774 100644 --- a/gomore/01_messages/README.md +++ b/gomore/01_messages/README.md @@ -10,7 +10,9 @@ 图片来源:[pubsub-pattern.md](https://github.com/imsardine/dev-notes/blob/source/docs/pubsub-pattern.md) -现实生活中的各种信息平台,就是很好的发布订阅的例子,比如某八戒、某无忧等 +现实生活中的各种信息平台,就是很好的发布订阅的例子,比如某八戒、某无忧等。 + +在发布订阅模型中,每条消息都会传送给多个订阅者。发布者通常不会知道、也不关心哪一个订阅者正在接收主题消息。订阅者和发布者可以在运行时动态添加是一种松散的耦合关系,这使得系统的复杂性可以随时间的推移而增长。在现实生活中,不同城市的象天气预报也是这个模式。 这里演示,一个拼车例子,车主发布拼车(Topic)消息,消息推送到订阅拼车(Topic)信息的所有用户. diff --git a/gomore/01_messages/message_weather.go b/gomore/01_messages/message_weather.go new file mode 100644 index 0000000..1444e1b --- /dev/null +++ b/gomore/01_messages/message_weather.go @@ -0,0 +1,84 @@ +package messaging + +import ( + "sync" + "time" +) + +type ( + subscriber chan interface{} // 订阅者为一个管道 + topicFunc func(v interface{}) bool // 主题为一个过滤器 +) + +//Publisher 发布者对象 +type Publisher struct { + m sync.RWMutex // 读写锁 + buffer int // 订阅队列的缓存大小 + timeout time.Duration // 发布超时时间 + subscribers map[subscriber]topicFunc // 订阅者信息 +} + +//NewPublisher 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度 +func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { + return &Publisher{ + buffer: buffer, + timeout: publishTimeout, + subscribers: make(map[subscriber]topicFunc), + } +} + +//Subscribe 添加一个新的订阅者,订阅全部主题 +func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +//SubscribeTopic 添加一个新的订阅者,订阅过滤器筛选后的主题 +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { + ch := make(chan interface{}, p.buffer) + p.m.Lock() + p.subscribers[ch] = topic + p.m.Unlock() + return ch +} + +//Evict 退出订阅 +func (p *Publisher) Evict(sub chan interface{}) { + p.m.Lock() + defer p.m.Unlock() + delete(p.subscribers, sub) + close(sub) +} + +//Publish 发布一个主题 +func (p *Publisher) Publish(v interface{}) { + p.m.RLock() + defer p.m.RUnlock() + var wg sync.WaitGroup + for sub, topic := range p.subscribers { + wg.Add(1) + go p.sendTopic(sub, topic, v, &wg) + } + wg.Wait() +} + +//Close 关闭发布者对象,同时关闭所有的订阅者管道。 +func (p *Publisher) Close() { + p.m.Lock() + defer p.m.Unlock() + for sub := range p.subscribers { + delete(p.subscribers, sub) + close(sub) + } +} + +//sendTopic 发送主题,可以容忍一定的超时 +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + select { + case sub <- v: + case <-time.After(p.timeout): + } +} diff --git a/gomore/01_messages/message_weather_test.go b/gomore/01_messages/message_weather_test.go new file mode 100644 index 0000000..158b638 --- /dev/null +++ b/gomore/01_messages/message_weather_test.go @@ -0,0 +1,43 @@ +package messaging + +import ( + "fmt" + "strings" + "testing" + "time" +) + +/*不同城市的象天气预报可以应用这个模式。*/ +//这个是一个第三方的例子 +func TestMessageSubPub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + defer p.Close() + + //订阅全部 + all := p.SubscribeTopic(nil) + + //订阅包含天气的消息 + onlyweathers := p.SubscribeTopic(func(v interface{}) bool { + if s, ok := v.(string); ok { + return strings.Contains(s, "weather") + } + return false + }) + + p.Publish("weather bad, SH") + p.Publish("weather fine,SZ") + + go func() { + for msg := range all { + fmt.Println("all:", msg) + } + }() + + go func() { + for msg := range onlyweathers { + fmt.Println("Received:", msg) + } + }() + // 运行一定时间后退出 + time.Sleep(3 * time.Second) +}