finish advanced circuit breaker

This commit is contained in:
eamon 2020-06-04 17:04:53 +08:00
parent c10a20aa5d
commit 4ba68f1cbd
4 changed files with 137 additions and 68 deletions

View File

@ -14,7 +14,7 @@ import (
) )
//BreakConditionWatcher check state //BreakConditionWatcher check state
type BreakConditionWatcher func(cnter counters) bool type BreakConditionWatcher func(state State, cnter counters) bool
//StateChangedEventHandler set event handle //StateChangedEventHandler set event handle
type StateChangedEventHandler func(name string, from State, to State) type StateChangedEventHandler func(name string, from State, to State)
@ -24,18 +24,19 @@ type Option func(opts *Options)
//Options for breaker //Options for breaker
type Options struct { type Options struct {
Name string Name string
Expiry time.Time Expiry time.Time
Interval, Timeout time.Duration Interval, Timeout time.Duration
MaxRequests uint32 MaxRequests uint32
WhenToBreak BreakConditionWatcher //是否应该断开电路(打开电路开关) CanOpen BreakConditionWatcher //是否应该断开电路(打开电路开关)
OnStateChanged StateChangedEventHandler CanClose BreakConditionWatcher //if we should close switch
ShoulderToOpen uint32 OnStateChanged StateChangedEventHandler
Ctx context.Context ShoulderHalfToOpen uint32
Ctx context.Context
} }
//Name of breaker //ActionName of breaker
func Name(name string) Option { func ActionName(name string) Option {
return func(opts *Options) { return func(opts *Options) {
opts.Name = name opts.Name = name
} }
@ -66,6 +67,13 @@ func MaxRequests(maxRequests uint32) Option {
} }
} }
//WithShoulderHalfToOpen of breaker
func WithShoulderHalfToOpen(shoulderHalfToOpen uint32) Option {
return func(opts *Options) {
opts.ShoulderHalfToOpen = shoulderHalfToOpen
}
}
//Expiry of breaker //Expiry of breaker
func Expiry(expiry time.Time) Option { func Expiry(expiry time.Time) Option {
return func(opts *Options) { return func(opts *Options) {
@ -80,9 +88,16 @@ func WithStateChanged(handler StateChangedEventHandler) Option {
} }
} }
//BreakIf check traffic state ,to see if request can go //WithBreakCondition check traffic state ,to see if request can go
func BreakIf(whenCondition BreakConditionWatcher) Option { func WithBreakCondition(whenCondition BreakConditionWatcher) Option {
return func(opts *Options) { return func(opts *Options) {
opts.WhenToBreak = whenCondition opts.CanOpen = whenCondition
}
}
//WithCloseCondition check traffic state ,to see if request can go
func WithCloseCondition(whenCondition BreakConditionWatcher) Option {
return func(opts *Options) {
opts.CanClose = whenCondition
} }
} }

View File

@ -42,11 +42,11 @@ const (
//RequestBreaker for protection //RequestBreaker for protection
type RequestBreaker struct { type RequestBreaker struct {
options Options options Options
mutex sync.Mutex mutex sync.Mutex
state State state State
generation uint64 cnter counters
cnter counters preState State
} }
//NewRequestBreaker return a breaker //NewRequestBreaker return a breaker
@ -55,10 +55,11 @@ func NewRequestBreaker(opts ...Option) *RequestBreaker {
defaultOptions := Options{ defaultOptions := Options{
Name: "defaultBreakerName", Name: "defaultBreakerName",
Expiry: time.Now().Add(time.Second * 20), Expiry: time.Now().Add(time.Second * 20),
Interval: time.Second * 2, // interval to check status Interval: time.Second * 10, // interval to check closed status,default 10 seconds
Timeout: time.Second * 60, //default to 60 seconds Timeout: time.Second * 60, //timeout to check open, default 60 seconds
MaxRequests: 5, MaxRequests: 5,
WhenToBreak: func(cnter counters) bool { return cnter.ConsecutiveFailures > 2 }, CanOpen: func(current State, cnter counters) bool { return cnter.ConsecutiveFailures > 2 },
CanClose: func(current State, cnter counters) bool { return cnter.ConsecutiveSuccesses > 2 },
OnStateChanged: func(name string, fromPre State, toCurrent State) {}, OnStateChanged: func(name string, fromPre State, toCurrent State) {},
} }
@ -68,74 +69,102 @@ func NewRequestBreaker(opts ...Option) *RequestBreaker {
} }
return &RequestBreaker{ return &RequestBreaker{
options: defaultOptions, options: defaultOptions,
cnter: counters{}, cnter: counters{},
generation: 0, state: StateUnknown,
preState: StateUnknown,
} }
} }
func (rb *RequestBreaker) changeStateTo(state State) {
rb.preState = rb.state
rb.state = state
rb.cnter.Reset()
}
func (rb *RequestBreaker) beforeRequest() error {
rb.mutex.Lock()
defer rb.mutex.Unlock()
fmt.Println("before do request:", rb.cnter.Total())
switch rb.state {
case StateOpen:
//如果是断开状态,并且超时了,转到半开状态,记录
if rb.options.Expiry.Before(time.Now()) {
rb.changeStateTo(StateHalfOpen)
rb.options.Expiry = time.Now().Add(rb.options.Timeout)
return nil
}
case StateClosed:
if rb.options.Expiry.Before(time.Now()) {
rb.cnter.Reset()
rb.options.Expiry = time.Now().Add(rb.options.Interval)
}
}
return ErrTooManyRequests
}
// Do the given requested work if the RequestBreaker accepts it. // Do the given requested work if the RequestBreaker accepts it.
// Do returns an error instantly if the RequestBreaker rejects the request. // Do returns an error instantly if the RequestBreaker rejects the request.
// Otherwise, Execute returns the result of the request. // Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again. // If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again.
func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)) (interface{}, error) { func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)) (interface{}, error) {
preState := StateUnknown
//before //before
fmt.Println("before do : request:", rb.cnter.Total())
rb.mutex.Lock()
//如果是断开状态,并且超时了,转到半开状态,记录 if err := rb.beforeRequest(); err != nil {
if rb.state == StateOpen && rb.options.Expiry.Before(time.Now()) { return nil, err
preState = rb.state
rb.state = StateHalfOpen
rb.cnter.Reset()
} else {
return nil, ErrTooManyRequests
} }
rb.mutex.Unlock()
//do work //do work
//do work from requested user //do work from requested user
result, err := work(rb.options.Ctx) result, err := work(rb.options.Ctx)
//after work
rb.afterRequest(err)
return result, err
}
func (rb *RequestBreaker) afterRequest(resultErr error) {
rb.mutex.Lock() rb.mutex.Lock()
//失败了 defer rb.mutex.Unlock()
if err != nil { //after
rb.cnter.Count(FailureState) fmt.Println("after do request:", rb.cnter.Total())
//如果是在半开状态下的失败,立即打开开关
if rb.state == StateHalfOpen { if resultErr != nil {
rb.state = StateOpen //转为打开 //失败了,handle 失败
rb.options.OnStateChanged(rb.options.Name, StateHalfOpen, StateOpen) //半开到打开 rb.cnter.Count(FailureState, rb.preState == rb.state)
} else if rb.state == StateClosed { switch rb.state {
if rb.options.WhenToBreak(rb.cnter) { case StateHalfOpen, StateClosed:
rb.state = StateOpen //打开开关 if rb.options.CanOpen(rb.state, rb.cnter) {
rb.options.OnStateChanged(rb.options.Name, StateClosed, StateOpen) //关闭到打开 rb.changeStateTo(StateOpen) //打开开关
rb.cnter.Reset() rb.options.OnStateChanged(rb.options.Name, rb.state, StateOpen) //关闭到打开
} }
} }
} else { } else {
//成功了. //success !
rb.cnter.Count(SuccessState) rb.cnter.Count(SuccessState, rb.preState == rb.state)
if rb.state == StateHalfOpen {
if preState == StateOpen { switch rb.state {
preState = StateHalfOpen case StateHalfOpen:
if rb.preState == StateOpen {
// rb.changeStateTo(StateHalfOpen) //already handled in beforeRequest,Only fire StateChange Event
rb.options.OnStateChanged(rb.options.Name, StateOpen, StateHalfOpen) //打开到半开 rb.options.OnStateChanged(rb.options.Name, StateOpen, StateHalfOpen) //打开到半开
} }
if rb.cnter.ConsecutiveSuccesses >= rb.options.ShoulderToOpen { if rb.cnter.ConsecutiveSuccesses >= rb.options.ShoulderHalfToOpen {
rb.state = StateClosed rb.changeStateTo(StateClosed)
rb.options.Expiry = time.Now().Add(rb.options.Interval)
rb.options.OnStateChanged(rb.options.Name, StateHalfOpen, StateClosed) //半开到关闭 rb.options.OnStateChanged(rb.options.Name, StateHalfOpen, StateClosed) //半开到关闭
} }
} }
} }
rb.mutex.Unlock()
//after
fmt.Println("after do : request:", rb.cnter.Total())
return result, err
} }

View File

@ -23,12 +23,27 @@ var onStateChangeEvent = func(name string, from, to State) {
fmt.Println("name:", name, "from:", from, "to", to) fmt.Println("name:", name, "from:", from, "to", to)
} }
var whenConditionOccurred = func(cnter counters) bool { var canOpenSwitch = func(current State, cnter counters) bool {
if current == StateHalfOpen {
return cnter.ConsecutiveFailures > 2
}
//失败率,可以由用户自己定义 //失败率,可以由用户自己定义
failureRatio := float64(cnter.TotalFailures) / float64(cnter.Requests) failureRatio := float64(cnter.TotalFailures) / float64(cnter.Requests)
return cnter.Requests >= 3 && failureRatio >= 0.6 return cnter.Requests >= 3 && failureRatio >= 0.6
} }
var canCloseSwitch = func(current State, cnter counters) bool {
//失败率,可以由用户自己定义
if cnter.ConsecutiveSuccesses > 2 {
return true
}
//
successRatio := float64(cnter.TotalFailures) / float64(cnter.Requests)
return cnter.Requests >= 3 && successRatio >= 0.6
}
func TestObjectBreaker(t *testing.T) { func TestObjectBreaker(t *testing.T) {
jobToDo := func(ctx context.Context) (interface{}, error) { jobToDo := func(ctx context.Context) (interface{}, error) {
@ -44,7 +59,11 @@ func TestObjectBreaker(t *testing.T) {
return body, nil return body, nil
} }
breaker = NewRequestBreaker(Name("HTTP GET"), BreakIf(whenConditionOccurred), WithStateChanged(onStateChangeEvent)) breaker = NewRequestBreaker(ActionName("HTTP GET"),
WithBreakCondition(canOpenSwitch),
WithCloseCondition(canCloseSwitch),
WithShoulderHalfToOpen(2),
WithStateChanged(onStateChangeEvent))
body, err := breaker.Do(jobToDo) body, err := breaker.Do(jobToDo)
if err != nil { if err != nil {

View File

@ -33,7 +33,7 @@ type OperationState int
//ICounter interface //ICounter interface
type ICounter interface { type ICounter interface {
Count(OperationState) Count(OperationState, bool)
LastActivity() time.Time LastActivity() time.Time
Reset() Reset()
Total() uint32 Total() uint32
@ -63,13 +63,19 @@ func (c *counters) Reset() {
} }
//Count the failure and success //Count the failure and success
func (c *counters) Count(statue OperationState) { func (c *counters) Count(statue OperationState, isConsecutive bool) {
switch statue { switch statue {
case FailureState: case FailureState:
c.ConsecutiveFailures++ c.TotalFailures++
if isConsecutive || c.ConsecutiveFailures == 0 {
c.ConsecutiveFailures++
}
case SuccessState: case SuccessState:
c.ConsecutiveSuccesses++ c.TotalSuccesses++
if isConsecutive || c.ConsecutiveSuccesses == 0 {
c.ConsecutiveSuccesses++
}
} }
c.Requests++ c.Requests++
c.lastActivity = time.Now() //更新活动时间 c.lastActivity = time.Now() //更新活动时间