From c2491af3d4d66e0ff4eb9f1a9bb4d1f8dd7cf004 Mon Sep 17 00:00:00 2001 From: Edward Date: Fri, 22 May 2020 15:31:28 +0800 Subject: [PATCH] update codes for circuit_breaker --- .../06_circuit_breaker/circuit_breaker.go | 19 +---- .../06_circuit_breaker/circuit_conter.go | 27 ++++--- .../circuit_func_closure.go | 79 +++++++------------ 3 files changed, 46 insertions(+), 79 deletions(-) diff --git a/resiliency/06_circuit_breaker/circuit_breaker.go b/resiliency/06_circuit_breaker/circuit_breaker.go index 9578043..4b36cc8 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker.go +++ b/resiliency/06_circuit_breaker/circuit_breaker.go @@ -3,7 +3,7 @@ * @Author: Edward * @Date: 2020-05-10 22:00:58 * @Last Modified by: Edward - * @Last Modified time: 2020-05-21 17:41:46 + * @Last Modified time: 2020-05-22 14:22:42 */ package circuit @@ -33,23 +33,6 @@ const ( defaultSuccessThreshold = 2 ) -//////////////////////////////// -/// 状态计数器 用以维护断路器内部的状态 -/// 无论是对象式断路器还是函数式断路器 -/// 都要用到计数器 -//////////////////////////////// - -//State 断路器本身的状态的 -//State of switch int -type State int - -// state for breaker -const ( - StateClosed State = iota //默认的闭合状态,可以正常执行业务 - StateHalfOpen - StateOpen -) - //////////////////////////////// //way 1 对象式断路器 //////////////////////////////// diff --git a/resiliency/06_circuit_breaker/circuit_conter.go b/resiliency/06_circuit_breaker/circuit_conter.go index 7ca1bd7..c048c31 100644 --- a/resiliency/06_circuit_breaker/circuit_conter.go +++ b/resiliency/06_circuit_breaker/circuit_conter.go @@ -3,24 +3,33 @@ * @Author: Edward * @Date: 2020-05-22 12:41:54 * @Last Modified by: Edward - * @Last Modified time: 2020-05-22 12:41:54 + * @Last Modified time: 2020-05-22 14:21:00 */ package circuit import "time" +//////////////////////////////// +/// 计数器 用以维护断路器内部的状态 +/// 无论是对象式断路器还是函数式断路器 +/// 都要用到计数器 +//////////////////////////////// + +//State 断路器本身的状态的 +//State of switch int +type State int + +// state for breaker +const ( + StateClosed State = iota //默认的闭合状态,可以正常执行业务 + StateHalfOpen + StateOpen +) + //OperationState of current 某一次操作的结果状态 type OperationState int -//states of CircuitBreaker -//states: closed --->open ---> half open --> closed -const ( - UnknownState OperationState = iota - FailureState - SuccessState -) - //ICounter interface type ICounter interface { Count(OperationState) diff --git a/resiliency/06_circuit_breaker/circuit_func_closure.go b/resiliency/06_circuit_breaker/circuit_func_closure.go index c7cb7c6..0ede755 100644 --- a/resiliency/06_circuit_breaker/circuit_func_closure.go +++ b/resiliency/06_circuit_breaker/circuit_func_closure.go @@ -3,7 +3,7 @@ * @Author: Edward * @Date: 2020-05-22 12:42:34 * @Last Modified by: Edward - * @Last Modified time: 2020-05-22 12:42:34 + * @Last Modified time: 2020-05-22 14:35:00 */ package circuit @@ -17,80 +17,55 @@ import ( //way 2 简单的函数式断路器 // 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问 // 理论上讲也需要考虑同步问题 +// 当前的设计思路,非常简单: +// 1、不考虑三种状态之间的转换,只靠两种状态,电路打开与电路关闭 +// 2、当累计失败到达一定失败次数就端开请求(打开电路),并延迟一定的时间再允许请求 //////////////////////////////// +//states of CircuitBreaker +//states: closed --->open ---> half open --> closed +const ( + UnknownState OperationState = iota + FailureState + SuccessState +) + //Circuit of action stream,this is actually to do something. //Circuit hold the really action type Circuit func(context.Context) error -//Breaker return a closure wrapper to hold request,达到指定的失败次数后电路断开 +var canRetry = func(cnt counters, failureThreshold uint32) bool { + backoffLevel := cnt.ConsecutiveFailures - failureThreshold + // Calculates when should the circuit breaker resume propagating requests + // to the service + shouldRetryAt := cnt.LastActivity().Add(time.Second << backoffLevel) + return time.Now().After(shouldRetryAt) +} + +//Breaker return a closure wrapper to hold request func Breaker(c Circuit, failureThreshold uint32) Circuit { - //内部计数器 + //闭包内部的全局计数器 和状态标志 cnt := counters{} - expired := time.Now() - currentState := StateClosed //默认是闭合状态 //ctx can be used hold parameters return func(ctx context.Context) error { if cnt.ConsecutiveFailures >= failureThreshold { - - //断路器在half open状态下的控制逻辑 - canRetry := func(cnt counters) bool { - //间歇时间,多个线程时候会存在同步文件需要lock操作 - backoffLevel := cnt.ConsecutiveFailures - failureThreshold - // Calculates when should the circuit breaker resume propagating requests - // to the service - shouldRetryAt := cnt.LastActivity().Add(time.Second << backoffLevel) - return time.Now().After(shouldRetryAt) - } - - //如果仍然不能执行,直接返回失败 - if !canRetry(cnt) { + if !canRetry(cnt, failureThreshold) { // Fails fast instead of propagating requests to the circuit since // not enough time has passed since the last failure to retry return ErrServiceUnavailable } } - - // 可以执行,则执行,并累计成功和失败次数 // Unless the failure threshold is exceeded the wrapped service mimics the // old behavior and the difference in behavior is seen after consecutive failures - // do the job - - //handle statue transformation for timeout - if currentState == StateOpen { - nowt := time.Now() - if expired.Before(nowt) || expired.Equal(nowt) { - currentState = StateHalfOpen //端开状态的计时器过期了,转为半开 - cnt.ConsecutiveSuccesses = 0 //重置用于累计成功调用的计数器 - } + if err := c(ctx); err != nil { + cnt.Count(FailureState) + return err } - switch currentState { - case StateOpen: - return ErrServiceUnavailable //直接失败 - case StateHalfOpen: - if err := c(ctx); err != nil { - //统计状态 - cnt.Count(FailureState) - currentState = StateOpen - expired = time.Now().Add(defaultTimeout) //Reset默认的超时时间 - return err - } - //统计成功状态 - cnt.Count(SuccessState) - //处理状态转换 - if cnt.ConsecutiveSuccesses > defaultSuccessThreshold { - currentState = StateClosed - cnt.ConsecutiveFailures = 0 - } - return nil - - case StateClosed: - - } + cnt.Count(SuccessState) return nil } }