From 477910bf8d2983ff72c848fb9f35567ee9dcd2ca Mon Sep 17 00:00:00 2001 From: Edward Date: Tue, 28 Apr 2020 22:48:25 +0800 Subject: [PATCH] update messaging codes --- concurrency/rate_limiting_test.go | 105 ------------------ .../27_messages/{messaging.go => message.go} | 5 + gomore/27_messages/message_test.go | 15 +++ gomore/27_messages/messaging_test.go | 8 -- gomore/README.md | 2 + gomore/semaphore/semaphore.go | 49 ++++++++ main.go | 17 +++ syncs/README.md | 3 - 8 files changed, 88 insertions(+), 116 deletions(-) delete mode 100644 concurrency/rate_limiting_test.go rename gomore/27_messages/{messaging.go => message.go} (93%) create mode 100644 gomore/27_messages/message_test.go delete mode 100644 gomore/27_messages/messaging_test.go create mode 100644 gomore/semaphore/semaphore.go create mode 100644 main.go delete mode 100644 syncs/README.md diff --git a/concurrency/rate_limiting_test.go b/concurrency/rate_limiting_test.go deleted file mode 100644 index f58131a..0000000 --- a/concurrency/rate_limiting_test.go +++ /dev/null @@ -1,105 +0,0 @@ -package concurrency - -import ( - "fmt" - "testing" - "time" -) - -/* - -Rate limiting is an very important mechanism -With limiting you can controll resource utilization and maintain quality of service. -Go supports rate limiting by using goroutines, channels, and tickers. -*/ - -func TestRateLimiting(t *testing.T) { - requests := make(chan int, 5) - for i := 1; i <= 5; i++ { - requests <- i - } - close(requests) - - limiter := time.Tick(200 * time.Millisecond) - - for req := range requests { - <-limiter - t.Log("Sev request by 2000 Millisecond", req, time.Now()) - } - - burstyLimiter := make(chan struct{}, 3) - - //init burstyLimiter - for i := 0; i < 3; i++ { - burstyLimiter <- struct{}{} - } - go func() { - for { - select { - case <-time.Tick(200 * time.Millisecond): - burstyLimiter <- struct{}{} - } - - } - }() - - //max request queue - burstyRequestsQueue := make(chan int, 5) - for i := 1; i <= 5; i++ { - burstyRequestsQueue <- i - } - close(burstyRequestsQueue) - - for req := range burstyRequestsQueue { - <-burstyLimiter - if len(burstyLimiter) > 0 { - fmt.Println("working current in bursting status!") - } else { - fmt.Println("working current in normal status!") - } - fmt.Println("request handled", req, time.Now()) - } - - rateLimiting() - -} - -func rateLimiting() { - - requests := make(chan int, 5) - for i := 1; i <= 5; i++ { - requests <- i - } - close(requests) - - limiter := time.Tick(200 * time.Millisecond) - - for req := range requests { - <-limiter - fmt.Println("request", req, time.Now()) - } - - //突发限流器 - burstyLimiter := make(chan time.Time, 3) - - for i := 0; i < 3; i++ { - burstyLimiter <- time.Now() - } - - go func() { - for t := range time.Tick(200 * time.Millisecond) { - burstyLimiter <- t - } - }() - - //请求队列 - burstyRequests := make(chan int, 5) - for i := 1; i <= 5; i++ { - burstyRequests <- i - } - close(burstyRequests) - for req := range burstyRequests { - <-burstyLimiter - fmt.Println("request", req, time.Now()) - } -} diff --git a/gomore/27_messages/messaging.go b/gomore/27_messages/message.go similarity index 93% rename from gomore/27_messages/messaging.go rename to gomore/27_messages/message.go index 060ce9b..df860f0 100644 --- a/gomore/27_messages/messaging.go +++ b/gomore/27_messages/message.go @@ -48,6 +48,11 @@ type Topic struct { MessageHistory []Message //当前主题的消息历史,实际项目中需要限定大小并设置过期时间 } +//MesssageQueue of manager all topics +type MesssageQueue struct { + Topics map[uint64]*Topic +} + //String remove Subscription func (t *Topic) String() string { return t.Name diff --git a/gomore/27_messages/message_test.go b/gomore/27_messages/message_test.go new file mode 100644 index 0000000..6f1b26f --- /dev/null +++ b/gomore/27_messages/message_test.go @@ -0,0 +1,15 @@ +package gomore + +import "testing" + +func TestMessageSubAndPub(t *testing.T) { + + //创建一个队列 + msgQueue := MesssageQueue{Topics: map[uint64]*Topic{}} + + //创建一个话题 + topic := Topic{} + + //像队列订阅话题 + topic.Subscribe(123) +} diff --git a/gomore/27_messages/messaging_test.go b/gomore/27_messages/messaging_test.go deleted file mode 100644 index b6d74ab..0000000 --- a/gomore/27_messages/messaging_test.go +++ /dev/null @@ -1,8 +0,0 @@ -package gomore - -import "testing" - -func TestMessageSubAndPub(t *testing.T) { - topic := Topic{} - topic.Subscribe(123) -} diff --git a/gomore/README.md b/gomore/README.md index 01b94bd..ccb76dd 100644 --- a/gomore/README.md +++ b/gomore/README.md @@ -1,3 +1,5 @@ # go more 基于go的语言特性,在go的领域可以更容易的实现更多的高效并且有趣的模式. + +同步并发处理中的一些常用模式 \ No newline at end of file diff --git a/gomore/semaphore/semaphore.go b/gomore/semaphore/semaphore.go new file mode 100644 index 0000000..82f53e3 --- /dev/null +++ b/gomore/semaphore/semaphore.go @@ -0,0 +1,49 @@ +package semaphore + +import ( + "errors" + "time" +) + +var ( + ErrNoTickets = errors.New("semaphore: could not aquire semaphore") + ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first") +) + +// Interface contains the behavior of a semaphore that can be acquired and/or released. +type Interface interface { + Acquire() error + Release() error +} + +type implementation struct { + sem chan struct{} + timeout time.Duration +} + +func (s *implementation) Acquire() error { + select { + case s.sem <- struct{}{}: + return nil + case <-time.After(s.timeout): + return ErrNoTickets + } +} + +func (s *implementation) Release() error { + select { + case _ = <-s.sem: + return nil + case <-time.After(s.timeout): + return ErrIllegalRelease + } + + return nil +} + +func New(tickets int, timeout time.Duration) Interface { + return &implementation{ + sem: make(chan struct{}, tickets), + timeout: timeout, + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..9443943 --- /dev/null +++ b/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + + "github.com/logrusorgru/aurora" //这是一个控制台可以多种颜色输出的颜色库 +) + +func main() { + + startGo := letsGo() + fmt.Sprintln(aurora.Green(startGo)) +} + +func letsGo() string { + return fmt.Sprintln("start go!") +} diff --git a/syncs/README.md b/syncs/README.md deleted file mode 100644 index ff76b98..0000000 --- a/syncs/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# 同步模式 - -s同步处理中的一些常用模式 \ No newline at end of file