update codes for circuit_breaker

This commit is contained in:
Edward 2020-05-22 15:31:28 +08:00
parent 928a133b00
commit c2491af3d4
3 changed files with 46 additions and 79 deletions

View File

@ -3,7 +3,7 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-10 22:00:58 * @Date: 2020-05-10 22:00:58
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-21 17:41:46 * @Last Modified time: 2020-05-22 14:22:42
*/ */
package circuit package circuit
@ -33,23 +33,6 @@ const (
defaultSuccessThreshold = 2 defaultSuccessThreshold = 2
) )
////////////////////////////////
/// 状态计数器 用以维护断路器内部的状态
/// 无论是对象式断路器还是函数式断路器
/// 都要用到计数器
////////////////////////////////
//State 断路器本身的状态的
//State of switch int
type State int
// state for breaker
const (
StateClosed State = iota //默认的闭合状态,可以正常执行业务
StateHalfOpen
StateOpen
)
//////////////////////////////// ////////////////////////////////
//way 1 对象式断路器 //way 1 对象式断路器
//////////////////////////////// ////////////////////////////////

View File

@ -3,24 +3,33 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-22 12:41:54 * @Date: 2020-05-22 12:41:54
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-22 12:41:54 * @Last Modified time: 2020-05-22 14:21:00
*/ */
package circuit package circuit
import "time" import "time"
////////////////////////////////
/// 计数器 用以维护断路器内部的状态
/// 无论是对象式断路器还是函数式断路器
/// 都要用到计数器
////////////////////////////////
//State 断路器本身的状态的
//State of switch int
type State int
// state for breaker
const (
StateClosed State = iota //默认的闭合状态,可以正常执行业务
StateHalfOpen
StateOpen
)
//OperationState of current 某一次操作的结果状态 //OperationState of current 某一次操作的结果状态
type OperationState int type OperationState int
//states of CircuitBreaker
//states: closed --->open ---> half open --> closed
const (
UnknownState OperationState = iota
FailureState
SuccessState
)
//ICounter interface //ICounter interface
type ICounter interface { type ICounter interface {
Count(OperationState) Count(OperationState)

View File

@ -3,7 +3,7 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-22 12:42:34 * @Date: 2020-05-22 12:42:34
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-22 12:42:34 * @Last Modified time: 2020-05-22 14:35:00
*/ */
package circuit package circuit
@ -17,80 +17,55 @@ import (
//way 2 简单的函数式断路器 //way 2 简单的函数式断路器
// 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问 // 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问
// 理论上讲也需要考虑同步问题 // 理论上讲也需要考虑同步问题
// 当前的设计思路,非常简单:
// 1、不考虑三种状态之间的转换只靠两种状态电路打开与电路关闭
// 2、当累计失败到达一定失败次数就端开请求(打开电路),并延迟一定的时间再允许请求
//////////////////////////////// ////////////////////////////////
//states of CircuitBreaker
//states: closed --->open ---> half open --> closed
const (
UnknownState OperationState = iota
FailureState
SuccessState
)
//Circuit of action stream,this is actually to do something. //Circuit of action stream,this is actually to do something.
//Circuit hold the really action //Circuit hold the really action
type Circuit func(context.Context) error type Circuit func(context.Context) error
//Breaker return a closure wrapper to hold request,达到指定的失败次数后电路断开 var canRetry = func(cnt counters, failureThreshold uint32) bool {
backoffLevel := cnt.ConsecutiveFailures - failureThreshold
// Calculates when should the circuit breaker resume propagating requests
// to the service
shouldRetryAt := cnt.LastActivity().Add(time.Second << backoffLevel)
return time.Now().After(shouldRetryAt)
}
//Breaker return a closure wrapper to hold request
func Breaker(c Circuit, failureThreshold uint32) Circuit { func Breaker(c Circuit, failureThreshold uint32) Circuit {
//内部计数器 //闭包内部的全局计数器 和状态标志
cnt := counters{} cnt := counters{}
expired := time.Now()
currentState := StateClosed //默认是闭合状态
//ctx can be used hold parameters //ctx can be used hold parameters
return func(ctx context.Context) error { return func(ctx context.Context) error {
if cnt.ConsecutiveFailures >= failureThreshold { if cnt.ConsecutiveFailures >= failureThreshold {
if !canRetry(cnt, failureThreshold) {
//断路器在half open状态下的控制逻辑
canRetry := func(cnt counters) bool {
//间歇时间多个线程时候会存在同步文件需要lock操作
backoffLevel := cnt.ConsecutiveFailures - failureThreshold
// Calculates when should the circuit breaker resume propagating requests
// to the service
shouldRetryAt := cnt.LastActivity().Add(time.Second << backoffLevel)
return time.Now().After(shouldRetryAt)
}
//如果仍然不能执行,直接返回失败
if !canRetry(cnt) {
// Fails fast instead of propagating requests to the circuit since // Fails fast instead of propagating requests to the circuit since
// not enough time has passed since the last failure to retry // not enough time has passed since the last failure to retry
return ErrServiceUnavailable return ErrServiceUnavailable
} }
} }
// 可以执行,则执行,并累计成功和失败次数
// Unless the failure threshold is exceeded the wrapped service mimics the // Unless the failure threshold is exceeded the wrapped service mimics the
// old behavior and the difference in behavior is seen after consecutive failures // old behavior and the difference in behavior is seen after consecutive failures
// do the job if err := c(ctx); err != nil {
cnt.Count(FailureState)
//handle statue transformation for timeout return err
if currentState == StateOpen {
nowt := time.Now()
if expired.Before(nowt) || expired.Equal(nowt) {
currentState = StateHalfOpen //端开状态的计时器过期了,转为半开
cnt.ConsecutiveSuccesses = 0 //重置用于累计成功调用的计数器
}
} }
switch currentState { cnt.Count(SuccessState)
case StateOpen:
return ErrServiceUnavailable //直接失败
case StateHalfOpen:
if err := c(ctx); err != nil {
//统计状态
cnt.Count(FailureState)
currentState = StateOpen
expired = time.Now().Add(defaultTimeout) //Reset默认的超时时间
return err
}
//统计成功状态
cnt.Count(SuccessState)
//处理状态转换
if cnt.ConsecutiveSuccesses > defaultSuccessThreshold {
currentState = StateClosed
cnt.ConsecutiveFailures = 0
}
return nil
case StateClosed:
}
return nil return nil
} }
} }