update messaging codes

This commit is contained in:
Edward 2020-04-28 22:48:25 +08:00
parent bce45c2379
commit 477910bf8d
8 changed files with 88 additions and 116 deletions

View File

@ -1,105 +0,0 @@
package concurrency
import (
"fmt"
"testing"
"time"
)
/*
Rate limiting is an very important mechanism
With limiting you can controll resource utilization and maintain quality of service.
Go supports rate limiting by using goroutines, channels, and tickers.
*/
func TestRateLimiting(t *testing.T) {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
t.Log("Sev request by 2000 Millisecond", req, time.Now())
}
burstyLimiter := make(chan struct{}, 3)
//init burstyLimiter
for i := 0; i < 3; i++ {
burstyLimiter <- struct{}{}
}
go func() {
for {
select {
case <-time.Tick(200 * time.Millisecond):
burstyLimiter <- struct{}{}
}
}
}()
//max request queue
burstyRequestsQueue := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequestsQueue <- i
}
close(burstyRequestsQueue)
for req := range burstyRequestsQueue {
<-burstyLimiter
if len(burstyLimiter) > 0 {
fmt.Println("working current in bursting status!")
} else {
fmt.Println("working current in normal status!")
}
fmt.Println("request handled", req, time.Now())
}
rateLimiting()
}
func rateLimiting() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}
//突发限流器
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
//请求队列
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}

View File

@ -48,6 +48,11 @@ type Topic struct {
MessageHistory []Message //当前主题的消息历史,实际项目中需要限定大小并设置过期时间 MessageHistory []Message //当前主题的消息历史,实际项目中需要限定大小并设置过期时间
} }
//MesssageQueue of manager all topics
type MesssageQueue struct {
Topics map[uint64]*Topic
}
//String remove Subscription //String remove Subscription
func (t *Topic) String() string { func (t *Topic) String() string {
return t.Name return t.Name

View File

@ -0,0 +1,15 @@
package gomore
import "testing"
func TestMessageSubAndPub(t *testing.T) {
//创建一个队列
msgQueue := MesssageQueue{Topics: map[uint64]*Topic{}}
//创建一个话题
topic := Topic{}
//像队列订阅话题
topic.Subscribe(123)
}

View File

@ -1,8 +0,0 @@
package gomore
import "testing"
func TestMessageSubAndPub(t *testing.T) {
topic := Topic{}
topic.Subscribe(123)
}

View File

@ -1,3 +1,5 @@
# go more # go more
基于go的语言特性,在go的领域可以更容易的实现更多的高效并且有趣的模式. 基于go的语言特性,在go的领域可以更容易的实现更多的高效并且有趣的模式.
同步并发处理中的一些常用模式

View File

@ -0,0 +1,49 @@
package semaphore
import (
"errors"
"time"
)
var (
ErrNoTickets = errors.New("semaphore: could not aquire semaphore")
ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first")
)
// Interface contains the behavior of a semaphore that can be acquired and/or released.
type Interface interface {
Acquire() error
Release() error
}
type implementation struct {
sem chan struct{}
timeout time.Duration
}
func (s *implementation) Acquire() error {
select {
case s.sem <- struct{}{}:
return nil
case <-time.After(s.timeout):
return ErrNoTickets
}
}
func (s *implementation) Release() error {
select {
case _ = <-s.sem:
return nil
case <-time.After(s.timeout):
return ErrIllegalRelease
}
return nil
}
func New(tickets int, timeout time.Duration) Interface {
return &implementation{
sem: make(chan struct{}, tickets),
timeout: timeout,
}
}

17
main.go Normal file
View File

@ -0,0 +1,17 @@
package main
import (
"fmt"
"github.com/logrusorgru/aurora" //这是一个控制台可以多种颜色输出的颜色库
)
func main() {
startGo := letsGo()
fmt.Sprintln(aurora.Green(startGo))
}
func letsGo() string {
return fmt.Sprintln("start go!")
}

View File

@ -1,3 +0,0 @@
# 同步模式
s同步处理中的一些常用模式