add circuit_breaker /rate_limiting

This commit is contained in:
Edward 2020-04-28 22:48:09 +08:00
parent 488d9790fa
commit bce45c2379
2 changed files with 162 additions and 0 deletions

View File

@ -0,0 +1,57 @@
package circuit
import (
"context"
"time"
)
type State int
const (
UnknownState State = iota
FailureState
SuccessState
)
type Counter interface {
Count(State)
ConsecutiveFailures() uint32
LastActivity() time.Time
Reset()
}
type Circuit func(context.Context) error
func Breaker(c Circuit, failureThreshold uint32) Circuit {
cnt := NewCounter()
return func(ctx context) error {
if cnt.ConsecutiveFailures() >= failureThreshold {
canRetry := func(cnt Counter) {
backoffLevel := Cnt.ConsecutiveFailures() - failureThreshold
// Calculates when should the circuit breaker resume propagating requests
// to the service
shouldRetryAt := cnt.LastActivity().Add(time.Seconds * 2 << backoffLevel)
return time.Now().After(shouldRetryAt)
}
if !canRetry(cnt) {
// Fails fast instead of propagating requests to the circuit since
// not enough time has passed since the last failure to retry
return ErrServiceUnavailable
}
}
// Unless the failure threshold is exceeded the wrapped service mimics the
// old behavior and the difference in behavior is seen after consecutive failures
if err := c(ctx); err != nil {
cnt.Count(FailureState)
return err
}
cnt.Count(SuccessState)
return nil
}
}

View File

@ -0,0 +1,105 @@
package ratelimit
import (
"fmt"
"testing"
"time"
)
/*
Rate limiting is an very important mechanism
With limiting you can controll resource utilization and maintain quality of service.
Go supports rate limiting by using goroutines, channels, and tickers.
*/
func TestRateLimiting(t *testing.T) {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
t.Log("Sev request by 2000 Millisecond", req, time.Now())
}
burstyLimiter := make(chan struct{}, 3)
//init burstyLimiter
for i := 0; i < 3; i++ {
burstyLimiter <- struct{}{}
}
go func() {
for {
select {
case <-time.Tick(200 * time.Millisecond):
burstyLimiter <- struct{}{}
}
}
}()
//max request queue
burstyRequestsQueue := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequestsQueue <- i
}
close(burstyRequestsQueue)
for req := range burstyRequestsQueue {
<-burstyLimiter
if len(burstyLimiter) > 0 {
fmt.Println("working current in bursting status!")
} else {
fmt.Println("working current in normal status!")
}
fmt.Println("request handled", req, time.Now())
}
rateLimiting()
}
func rateLimiting() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}
//突发限流器
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
//请求队列
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}