diff --git a/resiliency/06_circuit_breaker/breaker_options.go b/resiliency/06_circuit_breaker/breaker_options.go index 5bd1497..dad1d2a 100644 --- a/resiliency/06_circuit_breaker/breaker_options.go +++ b/resiliency/06_circuit_breaker/breaker_options.go @@ -2,13 +2,22 @@ package circuit import "time" +//BreakConditionWatcher check state +type BreakConditionWatcher func(counts counters) bool + +//StateChangedEventHandler set event handle +type StateChangedEventHandler func(name string, from State, to State) + +//Option set Options +type Option func(opts *Options) + //Options for breaker type Options struct { Name string Expiry time.Time Interval, Timeout time.Duration MaxRequests uint32 - ReadyToTrip StateCheckerHandler + WhenToBreak BreakConditionWatcher //是否应该断开电路(打开电路开关) OnStateChanged StateChangedEventHandler } @@ -58,9 +67,9 @@ func OnStateChanged(handler StateChangedEventHandler) Option { } } -//ReadyToTrip check traffic state ,to see if request can go -func ReadyToTrip(readyToGo StateCheckerHandler) Option { +//BreakIf check traffic state ,to see if request can go +func BreakIf(whenCondition BreakConditionWatcher) Option { return func(opts *Options) { - opts.ReadyToTrip = readyToGo + opts.WhenToBreak = whenCondition } } diff --git a/resiliency/06_circuit_breaker/circuit_breaker.go b/resiliency/06_circuit_breaker/circuit_breaker.go index 3d08f8e..bfa5318 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker.go +++ b/resiliency/06_circuit_breaker/circuit_breaker.go @@ -5,12 +5,13 @@ package circuit * @Author: Edward * @Date: 2020-05-10 22:00:58 * @Last Modified by: Edward - * @Last Modified time: 2020-05-11 22:15:25 + * @Last Modified time: 2020-05-21 15:59:40 */ import ( "context" "errors" + "fmt" "sync" "time" ) @@ -24,61 +25,14 @@ import ( var ( ErrTooManyRequests = errors.New("too many requests") ErrServiceUnavailable = errors.New("service unavailable") - FailureThreshold = 10 + FailureThreshold = 10 //最大失败次数--->失败阈值 ) -//StateCheckerHandler check state -type StateCheckerHandler func(counts counters) bool - -//StateChangedEventHandler set event handle -type StateChangedEventHandler func(name string, from State, to State) - -//Option set Options -type Option func(opts *Options) - -//RequestBreaker for protection -type RequestBreaker struct { - options Options - mutex sync.Mutex - state State - generation uint64 - counts ICounter -} - -//NewRequestBreaker return a breaker -func NewRequestBreaker(opts ...Option) *RequestBreaker { - - defaultOptions := Options{ - Name: "defaultBreakerName", - Expiry: time.Now().Add(time.Second * 20), - Interval: time.Second * 2, - Timeout: time.Second * 60, //default to 60 seconds - MaxRequests: 5, - ReadyToTrip: func(counts counters) bool { return true }, - OnStateChanged: func(name string, from State, to State) {}, - } - - for _, setOption := range opts { - setOption(&defaultOptions) - - } - - return &RequestBreaker{ - options: defaultOptions, - counts: nil, - generation: 0, - } -} - -// Do the given requested work if the RequestBreaker accepts it. -// Do returns an error instantly if the RequestBreaker rejects the request. -// Otherwise, Execute returns the result of the request. -// If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again. -func (rb *RequestBreaker) Do(work func() (interface{}, error)) (interface{}, error) { - //do work from requested user - result, err := work() - return result, err -} +//////////////////////////////// +/// 状态计数器 用以维护断路器内部的状态 +/// 无论是对象式断路器还是函数式断路器 +/// 都要用到计数器 +//////////////////////////////// //State of current switch type State int @@ -90,18 +44,16 @@ const ( SuccessState ) -//Circuit of action stream -type Circuit func(context.Context) error - //ICounter interface type ICounter interface { Count(State) LastActivity() time.Time Reset() + Total() uint32 } type counters struct { - Requests uint32 + Requests uint32 //连续的请求次数 lastState State lastActivity time.Time counts uint32 //counts of failures @@ -111,12 +63,19 @@ type counters struct { ConsecutiveFailures uint32 } +func (c *counters) Total() uint32 { + return c.Requests +} + func (c *counters) LastActivity() time.Time { return c.lastActivity } func (c *counters) Reset() { - + ct := &counters{} + ct.lastActivity = c.lastActivity + ct.lastState = c.lastState + c = ct } //Count the failure and success @@ -129,31 +88,98 @@ func (c *counters) Count(statue State) { c.ConsecutiveSuccesses++ } c.Requests++ + c.lastActivity = time.Now() //更新活动时间 c.lastState = statue + //fire event here + } -//WrapperBreaker return a Wrapper to hold request -func WrapperBreaker(c Circuit, failureThreshold uint32) Circuit { +//////////////////////////////// +//way 1 对象式断路器 +//////////////////////////////// + +//RequestBreaker for protection +type RequestBreaker struct { + options Options + mutex sync.Mutex + state State //断路器的当前状态 + generation uint64 + counts ICounter +} + +//NewRequestBreaker return a breaker +func NewRequestBreaker(opts ...Option) *RequestBreaker { + + defaultOptions := Options{ + Name: "defaultBreakerName", + Expiry: time.Now().Add(time.Second * 20), + Interval: time.Second * 2, // interval to check status + Timeout: time.Second * 60, //default to 60 seconds + MaxRequests: 5, + WhenToBreak: func(counts counters) bool { return counts.ConsecutiveFailures > 2 }, + OnStateChanged: func(name string, fromPre State, toCurrent State) {}, + } + + for _, setOption := range opts { + setOption(&defaultOptions) + + } + + return &RequestBreaker{ + options: defaultOptions, + counts: &counters{}, + generation: 0, + } +} + +// Do the given requested work if the RequestBreaker accepts it. +// Do returns an error instantly if the RequestBreaker rejects the request. +// Otherwise, Execute returns the result of the request. +// If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again. +func (rb *RequestBreaker) Do(work func() (interface{}, error)) (interface{}, error) { + + //before + fmt.Println("before do : request:", rb.counts.Total()) + + //do work from requested user + result, err := work() + + fmt.Println("after do : request:", rb.counts.Total()) + + return result, err +} + +//////////////////////////////// +//way 2 简单的函数式断路器 +//////////////////////////////// + +//Circuit of action stream,this is actually to do something. +//Circuit hold the really action +type Circuit func(context.Context) error + +//Breaker return a closure wrapper to hold request,达到指定的失败次数后电路断开 +func Breaker(c Circuit, failureThreshold uint32) Circuit { //内部计数器 cnt := counters{} + //ctx can be used hold parameters return func(ctx context.Context) error { if cnt.ConsecutiveFailures >= failureThreshold { canRetry := func(cnt counters) bool { - + //间歇时间,多个线程时候会存在同步文件需要lock操作 backoffLevel := cnt.ConsecutiveFailures - failureThreshold - // Calculates when should the circuit breaker resume propagating requests // to the service - shouldRetryAt := cnt.LastActivity().Add(time.Second * 2 << backoffLevel) - + backoffDuration := time.Second << backoffLevel + shouldRetryAt := cnt.LastActivity().Add(backoffDuration) return time.Now().After(shouldRetryAt) } + //如果仍然不能执行,直接返回失败 if !canRetry(cnt) { // Fails fast instead of propagating requests to the circuit since // not enough time has passed since the last failure to retry @@ -161,13 +187,17 @@ func WrapperBreaker(c Circuit, failureThreshold uint32) Circuit { } } + // 可以执行,则执行,并累计失败次数 // Unless the failure threshold is exceeded the wrapped service mimics the // old behavior and the difference in behavior is seen after consecutive failures + // do the job if err := c(ctx); err != nil { + //统计状态 cnt.Count(FailureState) return err } + //统计成功状态 cnt.Count(SuccessState) return nil } diff --git a/resiliency/06_circuit_breaker/circuit_breaker_test.go b/resiliency/06_circuit_breaker/circuit_breaker_test.go index fee5661..10bc9dd 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker_test.go +++ b/resiliency/06_circuit_breaker/circuit_breaker_test.go @@ -3,12 +3,13 @@ * @Author: Edward * @Date: 2020-05-11 10:55:28 * @Last Modified by: Edward - * @Last Modified time: 2020-05-11 21:35:39 + * @Last Modified time: 2020-05-21 14:08:53 */ package circuit import ( + "context" "fmt" "io/ioutil" "net/http" @@ -19,15 +20,7 @@ var breaker *RequestBreaker func TestBasicBreaker(t *testing.T) { - readyToTrip := func(counts counters) bool { - //失败率,可以由用户自己定义 - failureRatio := float64(counts.TotalFailures) / float64(counts.Requests) - return counts.Requests >= 3 && failureRatio >= 0.6 - } - - breaker = NewRequestBreaker(Name("HTTP GET"), ReadyToTrip(readyToTrip)) - - body, err := breaker.Do(func() (interface{}, error) { + jobToDo := func() (interface{}, error) { resp, err := http.Get("https://bing.com/robots.txt") if err != nil { return nil, err @@ -38,10 +31,47 @@ func TestBasicBreaker(t *testing.T) { return nil, err } return body, nil - }) + } + + whenCondition := func(counts counters) bool { + //失败率,可以由用户自己定义 + failureRatio := float64(counts.TotalFailures) / float64(counts.Requests) + return counts.Requests >= 3 && failureRatio >= 0.6 + } + + breaker = NewRequestBreaker(Name("HTTP GET"), BreakIf(whenCondition)) + + body, err := breaker.Do(jobToDo) if err != nil { t.Fatal(err) } fmt.Println(string(body.([]byte))) } + +func TestFunctionalBreaker(t *testing.T) { + + //something need to do + jobToDo := func(ctx context.Context) error { + resp, err := http.Get("https://bing.com/robots.txt") + if err != nil { + return err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + fmt.Println(string(body)) + return nil + } + + //wrapper and control job with a breaker + circuitWork := Breaker(jobToDo, 2 /* failureThreshold */) + + params := context.TODO() + + // do the job as usually + circuitWork(params) + +} diff --git a/resiliency/06_circuit_breaker/gobreaker/gobreaker.go b/resiliency/06_circuit_breaker/gobreaker/gobreaker.go index f402217..9ea7947 100644 --- a/resiliency/06_circuit_breaker/gobreaker/gobreaker.go +++ b/resiliency/06_circuit_breaker/gobreaker/gobreaker.go @@ -176,6 +176,7 @@ func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker { const defaultInterval = time.Duration(0) * time.Second const defaultTimeout = time.Duration(60) * time.Second +//5 Consecutive Failures will break func defaultReadyToTrip(counts Counts) bool { return counts.ConsecutiveFailures > 5 }