diff --git a/resiliency/06_circuit_breaker/breaker_options.go b/resiliency/06_circuit_breaker/breaker_options.go index df5be30..5da637c 100644 --- a/resiliency/06_circuit_breaker/breaker_options.go +++ b/resiliency/06_circuit_breaker/breaker_options.go @@ -14,7 +14,7 @@ import ( ) //BreakConditionWatcher check state -type BreakConditionWatcher func(cnter counters) bool +type BreakConditionWatcher func(state State, cnter counters) bool //StateChangedEventHandler set event handle type StateChangedEventHandler func(name string, from State, to State) @@ -24,18 +24,19 @@ type Option func(opts *Options) //Options for breaker type Options struct { - Name string - Expiry time.Time - Interval, Timeout time.Duration - MaxRequests uint32 - WhenToBreak BreakConditionWatcher //是否应该断开电路(打开电路开关) - OnStateChanged StateChangedEventHandler - ShoulderToOpen uint32 - Ctx context.Context + Name string + Expiry time.Time + Interval, Timeout time.Duration + MaxRequests uint32 + CanOpen BreakConditionWatcher //是否应该断开电路(打开电路开关) + CanClose BreakConditionWatcher //if we should close switch + OnStateChanged StateChangedEventHandler + ShoulderHalfToOpen uint32 + Ctx context.Context } -//Name of breaker -func Name(name string) Option { +//ActionName of breaker +func ActionName(name string) Option { return func(opts *Options) { opts.Name = name } @@ -66,6 +67,13 @@ func MaxRequests(maxRequests uint32) Option { } } +//WithShoulderHalfToOpen of breaker +func WithShoulderHalfToOpen(shoulderHalfToOpen uint32) Option { + return func(opts *Options) { + opts.ShoulderHalfToOpen = shoulderHalfToOpen + } +} + //Expiry of breaker func Expiry(expiry time.Time) Option { return func(opts *Options) { @@ -80,9 +88,16 @@ func WithStateChanged(handler StateChangedEventHandler) Option { } } -//BreakIf check traffic state ,to see if request can go -func BreakIf(whenCondition BreakConditionWatcher) Option { +//WithBreakCondition check traffic state ,to see if request can go +func WithBreakCondition(whenCondition BreakConditionWatcher) Option { return func(opts *Options) { - opts.WhenToBreak = whenCondition + opts.CanOpen = whenCondition + } +} + +//WithCloseCondition check traffic state ,to see if request can go +func WithCloseCondition(whenCondition BreakConditionWatcher) Option { + return func(opts *Options) { + opts.CanClose = whenCondition } } diff --git a/resiliency/06_circuit_breaker/circuit_breaker_adv.go b/resiliency/06_circuit_breaker/circuit_breaker_adv.go index 0385959..437386c 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker_adv.go +++ b/resiliency/06_circuit_breaker/circuit_breaker_adv.go @@ -42,11 +42,11 @@ const ( //RequestBreaker for protection type RequestBreaker struct { - options Options - mutex sync.Mutex - state State - generation uint64 - cnter counters + options Options + mutex sync.Mutex + state State + cnter counters + preState State } //NewRequestBreaker return a breaker @@ -55,10 +55,11 @@ func NewRequestBreaker(opts ...Option) *RequestBreaker { defaultOptions := Options{ Name: "defaultBreakerName", Expiry: time.Now().Add(time.Second * 20), - Interval: time.Second * 2, // interval to check status - Timeout: time.Second * 60, //default to 60 seconds + Interval: time.Second * 10, // interval to check closed status,default 10 seconds + Timeout: time.Second * 60, //timeout to check open, default 60 seconds MaxRequests: 5, - WhenToBreak: func(cnter counters) bool { return cnter.ConsecutiveFailures > 2 }, + CanOpen: func(current State, cnter counters) bool { return cnter.ConsecutiveFailures > 2 }, + CanClose: func(current State, cnter counters) bool { return cnter.ConsecutiveSuccesses > 2 }, OnStateChanged: func(name string, fromPre State, toCurrent State) {}, } @@ -68,74 +69,102 @@ func NewRequestBreaker(opts ...Option) *RequestBreaker { } return &RequestBreaker{ - options: defaultOptions, - cnter: counters{}, - generation: 0, + options: defaultOptions, + cnter: counters{}, + state: StateUnknown, + preState: StateUnknown, } } +func (rb *RequestBreaker) changeStateTo(state State) { + rb.preState = rb.state + rb.state = state + rb.cnter.Reset() +} + +func (rb *RequestBreaker) beforeRequest() error { + + rb.mutex.Lock() + defer rb.mutex.Unlock() + fmt.Println("before do request:", rb.cnter.Total()) + + switch rb.state { + case StateOpen: + //如果是断开状态,并且超时了,转到半开状态,记录 + if rb.options.Expiry.Before(time.Now()) { + rb.changeStateTo(StateHalfOpen) + rb.options.Expiry = time.Now().Add(rb.options.Timeout) + return nil + } + case StateClosed: + if rb.options.Expiry.Before(time.Now()) { + rb.cnter.Reset() + rb.options.Expiry = time.Now().Add(rb.options.Interval) + } + + } + + return ErrTooManyRequests + +} + // Do the given requested work if the RequestBreaker accepts it. // Do returns an error instantly if the RequestBreaker rejects the request. // Otherwise, Execute returns the result of the request. // If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again. func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)) (interface{}, error) { - preState := StateUnknown - //before - fmt.Println("before do : request:", rb.cnter.Total()) - rb.mutex.Lock() - //如果是断开状态,并且超时了,转到半开状态,记录 - if rb.state == StateOpen && rb.options.Expiry.Before(time.Now()) { - preState = rb.state - rb.state = StateHalfOpen - rb.cnter.Reset() - } else { - return nil, ErrTooManyRequests + if err := rb.beforeRequest(); err != nil { + return nil, err } - rb.mutex.Unlock() - //do work //do work from requested user result, err := work(rb.options.Ctx) + //after work + rb.afterRequest(err) + + return result, err +} + +func (rb *RequestBreaker) afterRequest(resultErr error) { + rb.mutex.Lock() - //失败了 - if err != nil { - rb.cnter.Count(FailureState) - //如果是在半开状态下的失败,立即打开开关 - if rb.state == StateHalfOpen { - rb.state = StateOpen //转为打开 - rb.options.OnStateChanged(rb.options.Name, StateHalfOpen, StateOpen) //半开到打开 - } else if rb.state == StateClosed { - if rb.options.WhenToBreak(rb.cnter) { - rb.state = StateOpen //打开开关 - rb.options.OnStateChanged(rb.options.Name, StateClosed, StateOpen) //关闭到打开 - rb.cnter.Reset() + defer rb.mutex.Unlock() + //after + fmt.Println("after do request:", rb.cnter.Total()) + + if resultErr != nil { + //失败了,handle 失败 + rb.cnter.Count(FailureState, rb.preState == rb.state) + switch rb.state { + case StateHalfOpen, StateClosed: + if rb.options.CanOpen(rb.state, rb.cnter) { + rb.changeStateTo(StateOpen) //打开开关 + rb.options.OnStateChanged(rb.options.Name, rb.state, StateOpen) //关闭到打开 } } - } else { - //成功了. - rb.cnter.Count(SuccessState) - if rb.state == StateHalfOpen { - if preState == StateOpen { - preState = StateHalfOpen + //success ! + rb.cnter.Count(SuccessState, rb.preState == rb.state) + + switch rb.state { + case StateHalfOpen: + if rb.preState == StateOpen { + // rb.changeStateTo(StateHalfOpen) //already handled in beforeRequest,Only fire StateChange Event rb.options.OnStateChanged(rb.options.Name, StateOpen, StateHalfOpen) //打开到半开 } - if rb.cnter.ConsecutiveSuccesses >= rb.options.ShoulderToOpen { - rb.state = StateClosed + if rb.cnter.ConsecutiveSuccesses >= rb.options.ShoulderHalfToOpen { + rb.changeStateTo(StateClosed) + rb.options.Expiry = time.Now().Add(rb.options.Interval) rb.options.OnStateChanged(rb.options.Name, StateHalfOpen, StateClosed) //半开到关闭 } } } - rb.mutex.Unlock() - //after - fmt.Println("after do : request:", rb.cnter.Total()) - return result, err } diff --git a/resiliency/06_circuit_breaker/circuit_breaker_test.go b/resiliency/06_circuit_breaker/circuit_breaker_test.go index a2bf61e..d35468e 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker_test.go +++ b/resiliency/06_circuit_breaker/circuit_breaker_test.go @@ -23,12 +23,27 @@ var onStateChangeEvent = func(name string, from, to State) { fmt.Println("name:", name, "from:", from, "to", to) } -var whenConditionOccurred = func(cnter counters) bool { +var canOpenSwitch = func(current State, cnter counters) bool { + + if current == StateHalfOpen { + return cnter.ConsecutiveFailures > 2 + } + //失败率,可以由用户自己定义 failureRatio := float64(cnter.TotalFailures) / float64(cnter.Requests) return cnter.Requests >= 3 && failureRatio >= 0.6 } +var canCloseSwitch = func(current State, cnter counters) bool { + //失败率,可以由用户自己定义 + if cnter.ConsecutiveSuccesses > 2 { + return true + } + // + successRatio := float64(cnter.TotalFailures) / float64(cnter.Requests) + return cnter.Requests >= 3 && successRatio >= 0.6 +} + func TestObjectBreaker(t *testing.T) { jobToDo := func(ctx context.Context) (interface{}, error) { @@ -44,7 +59,11 @@ func TestObjectBreaker(t *testing.T) { return body, nil } - breaker = NewRequestBreaker(Name("HTTP GET"), BreakIf(whenConditionOccurred), WithStateChanged(onStateChangeEvent)) + breaker = NewRequestBreaker(ActionName("HTTP GET"), + WithBreakCondition(canOpenSwitch), + WithCloseCondition(canCloseSwitch), + WithShoulderHalfToOpen(2), + WithStateChanged(onStateChangeEvent)) body, err := breaker.Do(jobToDo) if err != nil { diff --git a/resiliency/06_circuit_breaker/circuit_counter.go b/resiliency/06_circuit_breaker/circuit_counter.go index 02e0191..fc1081f 100644 --- a/resiliency/06_circuit_breaker/circuit_counter.go +++ b/resiliency/06_circuit_breaker/circuit_counter.go @@ -33,7 +33,7 @@ type OperationState int //ICounter interface type ICounter interface { - Count(OperationState) + Count(OperationState, bool) LastActivity() time.Time Reset() Total() uint32 @@ -63,13 +63,19 @@ func (c *counters) Reset() { } //Count the failure and success -func (c *counters) Count(statue OperationState) { +func (c *counters) Count(statue OperationState, isConsecutive bool) { switch statue { case FailureState: - c.ConsecutiveFailures++ + c.TotalFailures++ + if isConsecutive || c.ConsecutiveFailures == 0 { + c.ConsecutiveFailures++ + } case SuccessState: - c.ConsecutiveSuccesses++ + c.TotalSuccesses++ + if isConsecutive || c.ConsecutiveSuccesses == 0 { + c.ConsecutiveSuccesses++ + } } c.Requests++ c.lastActivity = time.Now() //更新活动时间