From 928a133b00c2a873073e47ebb29a3e7b36e3d687 Mon Sep 17 00:00:00 2001 From: Edward Date: Fri, 22 May 2020 13:21:54 +0800 Subject: [PATCH] split counter and func implement --- .../06_circuit_breaker/circuit_breaker.go | 135 +----------------- .../06_circuit_breaker/circuit_conter.go | 72 ++++++++++ .../circuit_func_closure.go | 96 +++++++++++++ 3 files changed, 172 insertions(+), 131 deletions(-) create mode 100644 resiliency/06_circuit_breaker/circuit_conter.go create mode 100644 resiliency/06_circuit_breaker/circuit_func_closure.go diff --git a/resiliency/06_circuit_breaker/circuit_breaker.go b/resiliency/06_circuit_breaker/circuit_breaker.go index aefd92a..9578043 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker.go +++ b/resiliency/06_circuit_breaker/circuit_breaker.go @@ -9,7 +9,6 @@ package circuit import ( - "context" "errors" "fmt" "sync" @@ -29,7 +28,10 @@ var ( ) // 默认的超时时间 -const defaultTimeout = time.Second * 30 +const ( + defaultTimeout = time.Second * 30 + defaultSuccessThreshold = 2 +) //////////////////////////////// /// 状态计数器 用以维护断路器内部的状态 @@ -48,67 +50,6 @@ const ( StateOpen ) -//OperationState of current 某一次操作的结果状态 -type OperationState int - -//states of CircuitBreaker -//states: closed --->open ---> half open --> closed -const ( - UnknownState OperationState = iota - FailureState - SuccessState -) - -//ICounter interface -type ICounter interface { - Count(OperationState) - LastActivity() time.Time - Reset() - Total() uint32 -} - -type counters struct { - Requests uint32 //连续的请求次数 - lastState OperationState - lastActivity time.Time - counts uint32 //counts of failures - TotalFailures uint32 - TotalSuccesses uint32 - ConsecutiveSuccesses uint32 - ConsecutiveFailures uint32 -} - -func (c *counters) Total() uint32 { - return c.Requests -} - -func (c *counters) LastActivity() time.Time { - return c.lastActivity -} - -func (c *counters) Reset() { - ct := &counters{} - ct.lastActivity = c.lastActivity - ct.lastState = c.lastState - c = ct -} - -//Count the failure and success -func (c *counters) Count(statue OperationState) { - - switch statue { - case FailureState: - c.ConsecutiveFailures++ - case SuccessState: - c.ConsecutiveSuccesses++ - } - c.Requests++ - c.lastActivity = time.Now() //更新活动时间 - c.lastState = statue - //handle status change - -} - //////////////////////////////// //way 1 对象式断路器 //////////////////////////////// @@ -163,71 +104,3 @@ func (rb *RequestBreaker) Do(work func() (interface{}, error)) (interface{}, err return result, err } - -//////////////////////////////// -//way 2 简单的函数式断路器 -//////////////////////////////// - -//Circuit of action stream,this is actually to do something. -//Circuit hold the really action -type Circuit func(context.Context) error - -//Breaker return a closure wrapper to hold request,达到指定的失败次数后电路断开 -func Breaker(c Circuit, failureThreshold uint32) Circuit { - - //内部计数器 - cnt := counters{} - expired := time.Now() - currentState := StateClosed //默认是闭合状态 - - //ctx can be used hold parameters - return func(ctx context.Context) error { - - if cnt.ConsecutiveFailures >= failureThreshold { - - //断路器在half open状态下的控制逻辑 - canRetry := func(cnt counters) bool { - //间歇时间,多个线程时候会存在同步文件需要lock操作 - backoffLevel := cnt.ConsecutiveFailures - failureThreshold - // Calculates when should the circuit breaker resume propagating requests - // to the service - shouldRetryAt := cnt.LastActivity().Add(time.Second << backoffLevel) - return time.Now().After(shouldRetryAt) - } - - //如果仍然不能执行,直接返回失败 - if !canRetry(cnt) { - // Fails fast instead of propagating requests to the circuit since - // not enough time has passed since the last failure to retry - return ErrServiceUnavailable - } - } - - // 可以执行,则执行,并累计成功和失败次数 - // Unless the failure threshold is exceeded the wrapped service mimics the - // old behavior and the difference in behavior is seen after consecutive failures - // do the job - - switch currentState { - case StateOpen: - if time.Now().Before(expired) { - currentState = StateHalfOpen //转为半开 - } - return ErrServiceUnavailable - case StateClosed: - case StateHalfOpen: - - } - - if err := c(ctx); err != nil { - //统计状态 - cnt.Count(FailureState) - - return err - } - - //统计成功状态 - cnt.Count(SuccessState) - return nil - } -} diff --git a/resiliency/06_circuit_breaker/circuit_conter.go b/resiliency/06_circuit_breaker/circuit_conter.go new file mode 100644 index 0000000..7ca1bd7 --- /dev/null +++ b/resiliency/06_circuit_breaker/circuit_conter.go @@ -0,0 +1,72 @@ +/* + * @Description: https://github.com/crazybber + * @Author: Edward + * @Date: 2020-05-22 12:41:54 + * @Last Modified by: Edward + * @Last Modified time: 2020-05-22 12:41:54 + */ + +package circuit + +import "time" + +//OperationState of current 某一次操作的结果状态 +type OperationState int + +//states of CircuitBreaker +//states: closed --->open ---> half open --> closed +const ( + UnknownState OperationState = iota + FailureState + SuccessState +) + +//ICounter interface +type ICounter interface { + Count(OperationState) + LastActivity() time.Time + Reset() + Total() uint32 +} + +type counters struct { + Requests uint32 //连续的请求次数 + lastState OperationState + lastActivity time.Time + counts uint32 //counts of failures + TotalFailures uint32 + TotalSuccesses uint32 + ConsecutiveSuccesses uint32 + ConsecutiveFailures uint32 +} + +func (c *counters) Total() uint32 { + return c.Requests +} + +func (c *counters) LastActivity() time.Time { + return c.lastActivity +} + +func (c *counters) Reset() { + ct := &counters{} + ct.lastActivity = c.lastActivity + ct.lastState = c.lastState + c = ct +} + +//Count the failure and success +func (c *counters) Count(statue OperationState) { + + switch statue { + case FailureState: + c.ConsecutiveFailures++ + case SuccessState: + c.ConsecutiveSuccesses++ + } + c.Requests++ + c.lastActivity = time.Now() //更新活动时间 + c.lastState = statue + //handle status change + +} diff --git a/resiliency/06_circuit_breaker/circuit_func_closure.go b/resiliency/06_circuit_breaker/circuit_func_closure.go new file mode 100644 index 0000000..c7cb7c6 --- /dev/null +++ b/resiliency/06_circuit_breaker/circuit_func_closure.go @@ -0,0 +1,96 @@ +/* + * @Description: https://github.com/crazybber + * @Author: Edward + * @Date: 2020-05-22 12:42:34 + * @Last Modified by: Edward + * @Last Modified time: 2020-05-22 12:42:34 + */ + +package circuit + +import ( + "context" + "time" +) + +//////////////////////////////// +//way 2 简单的函数式断路器 +// 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问 +// 理论上讲也需要考虑同步问题 +//////////////////////////////// + +//Circuit of action stream,this is actually to do something. +//Circuit hold the really action +type Circuit func(context.Context) error + +//Breaker return a closure wrapper to hold request,达到指定的失败次数后电路断开 +func Breaker(c Circuit, failureThreshold uint32) Circuit { + + //内部计数器 + cnt := counters{} + expired := time.Now() + currentState := StateClosed //默认是闭合状态 + + //ctx can be used hold parameters + return func(ctx context.Context) error { + + if cnt.ConsecutiveFailures >= failureThreshold { + + //断路器在half open状态下的控制逻辑 + canRetry := func(cnt counters) bool { + //间歇时间,多个线程时候会存在同步文件需要lock操作 + backoffLevel := cnt.ConsecutiveFailures - failureThreshold + // Calculates when should the circuit breaker resume propagating requests + // to the service + shouldRetryAt := cnt.LastActivity().Add(time.Second << backoffLevel) + return time.Now().After(shouldRetryAt) + } + + //如果仍然不能执行,直接返回失败 + if !canRetry(cnt) { + // Fails fast instead of propagating requests to the circuit since + // not enough time has passed since the last failure to retry + return ErrServiceUnavailable + } + } + + // 可以执行,则执行,并累计成功和失败次数 + // Unless the failure threshold is exceeded the wrapped service mimics the + // old behavior and the difference in behavior is seen after consecutive failures + // do the job + + //handle statue transformation for timeout + if currentState == StateOpen { + nowt := time.Now() + if expired.Before(nowt) || expired.Equal(nowt) { + currentState = StateHalfOpen //端开状态的计时器过期了,转为半开 + cnt.ConsecutiveSuccesses = 0 //重置用于累计成功调用的计数器 + } + } + + switch currentState { + case StateOpen: + return ErrServiceUnavailable //直接失败 + case StateHalfOpen: + if err := c(ctx); err != nil { + //统计状态 + cnt.Count(FailureState) + currentState = StateOpen + expired = time.Now().Add(defaultTimeout) //Reset默认的超时时间 + return err + } + //统计成功状态 + cnt.Count(SuccessState) + //处理状态转换 + if cnt.ConsecutiveSuccesses > defaultSuccessThreshold { + currentState = StateClosed + cnt.ConsecutiveFailures = 0 + } + return nil + + case StateClosed: + + } + return nil + } +}