From c8bd5eed55f94d844befa8b4ed24ce731ecf613e Mon Sep 17 00:00:00 2001 From: Edward Date: Fri, 22 May 2020 16:50:41 +0800 Subject: [PATCH] finish the simple circuit_breaker --- .../06_circuit_breaker/circuit_breaker.go | 7 ++- .../circuit_breaker_test.go | 9 ++-- .../{circuit_conter.go => circuit_counter.go} | 3 +- .../circuit_func_closure.go | 48 +++++++++++++++++-- 4 files changed, 55 insertions(+), 12 deletions(-) rename resiliency/06_circuit_breaker/{circuit_conter.go => circuit_counter.go} (94%) diff --git a/resiliency/06_circuit_breaker/circuit_breaker.go b/resiliency/06_circuit_breaker/circuit_breaker.go index 4b36cc8..6ef678b 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker.go +++ b/resiliency/06_circuit_breaker/circuit_breaker.go @@ -3,7 +3,7 @@ * @Author: Edward * @Date: 2020-05-10 22:00:58 * @Last Modified by: Edward - * @Last Modified time: 2020-05-22 14:22:42 + * @Last Modified time: 2020-05-22 16:44:57 */ package circuit @@ -34,7 +34,8 @@ const ( ) //////////////////////////////// -//way 1 对象式断路器 +//way 2 对象式断路器 +// 高级模式 //////////////////////////////// //RequestBreaker for protection @@ -80,6 +81,8 @@ func (rb *RequestBreaker) Do(work func() (interface{}, error)) (interface{}, err //before fmt.Println("before do : request:", rb.counts.Total()) + //handle status + //do work from requested user result, err := work() diff --git a/resiliency/06_circuit_breaker/circuit_breaker_test.go b/resiliency/06_circuit_breaker/circuit_breaker_test.go index 10bc9dd..9d30a55 100644 --- a/resiliency/06_circuit_breaker/circuit_breaker_test.go +++ b/resiliency/06_circuit_breaker/circuit_breaker_test.go @@ -3,7 +3,7 @@ * @Author: Edward * @Date: 2020-05-11 10:55:28 * @Last Modified by: Edward - * @Last Modified time: 2020-05-21 14:08:53 + * @Last Modified time: 2020-05-22 16:37:21 */ package circuit @@ -12,13 +12,14 @@ import ( "context" "fmt" "io/ioutil" + "log" "net/http" "testing" ) var breaker *RequestBreaker -func TestBasicBreaker(t *testing.T) { +func TestObjectBreaker(t *testing.T) { jobToDo := func() (interface{}, error) { resp, err := http.Get("https://bing.com/robots.txt") @@ -72,6 +73,8 @@ func TestFunctionalBreaker(t *testing.T) { params := context.TODO() // do the job as usually - circuitWork(params) + res := circuitWork(params) + + log.Print("\nresult:", res) } diff --git a/resiliency/06_circuit_breaker/circuit_conter.go b/resiliency/06_circuit_breaker/circuit_counter.go similarity index 94% rename from resiliency/06_circuit_breaker/circuit_conter.go rename to resiliency/06_circuit_breaker/circuit_counter.go index c048c31..5708243 100644 --- a/resiliency/06_circuit_breaker/circuit_conter.go +++ b/resiliency/06_circuit_breaker/circuit_counter.go @@ -3,7 +3,7 @@ * @Author: Edward * @Date: 2020-05-22 12:41:54 * @Last Modified by: Edward - * @Last Modified time: 2020-05-22 14:21:00 + * @Last Modified time: 2020-05-22 16:47:37 */ package circuit @@ -42,7 +42,6 @@ type counters struct { Requests uint32 //连续的请求次数 lastState OperationState lastActivity time.Time - counts uint32 //counts of failures TotalFailures uint32 TotalSuccesses uint32 ConsecutiveSuccesses uint32 diff --git a/resiliency/06_circuit_breaker/circuit_func_closure.go b/resiliency/06_circuit_breaker/circuit_func_closure.go index 0ede755..b0304f8 100644 --- a/resiliency/06_circuit_breaker/circuit_func_closure.go +++ b/resiliency/06_circuit_breaker/circuit_func_closure.go @@ -3,7 +3,7 @@ * @Author: Edward * @Date: 2020-05-22 12:42:34 * @Last Modified by: Edward - * @Last Modified time: 2020-05-22 14:35:00 + * @Last Modified time: 2020-05-22 16:48:56 */ package circuit @@ -16,7 +16,7 @@ import ( //////////////////////////////// //way 2 简单的函数式断路器 // 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问 -// 理论上讲也需要考虑同步问题 +// 当前简单场景下,只考虑单个worker情况下的连续请求 // 当前的设计思路,非常简单: // 1、不考虑三种状态之间的转换,只靠两种状态,电路打开与电路关闭 // 2、当累计失败到达一定失败次数就端开请求(打开电路),并延迟一定的时间再允许请求 @@ -30,11 +30,44 @@ const ( SuccessState ) +type simpleCounter struct { + lastState OperationState + lastActivity time.Time + ConsecutiveSuccesses uint32 + ConsecutiveFailures uint32 +} + +func (c *simpleCounter) LastActivity() time.Time { + return c.lastActivity +} + +func (c *simpleCounter) Reset() { + ct := &simpleCounter{} + ct.lastActivity = c.lastActivity + ct.lastState = UnknownState + c = ct +} + +//Count the failure and success +func (c *simpleCounter) Count(statue OperationState) { + + switch statue { + case FailureState: + c.ConsecutiveFailures++ + case SuccessState: + c.ConsecutiveSuccesses++ + } + c.lastActivity = time.Now() //更新活动时间 + c.lastState = statue + //handle status change +} + //Circuit of action stream,this is actually to do something. //Circuit hold the really action type Circuit func(context.Context) error -var canRetry = func(cnt counters, failureThreshold uint32) bool { +//失败达到阈值后,过两秒重试 +var canRetry = func(cnt simpleCounter, failureThreshold uint32) bool { backoffLevel := cnt.ConsecutiveFailures - failureThreshold // Calculates when should the circuit breaker resume propagating requests // to the service @@ -42,25 +75,30 @@ var canRetry = func(cnt counters, failureThreshold uint32) bool { return time.Now().After(shouldRetryAt) } -//Breaker return a closure wrapper to hold request +//Breaker return a closure wrapper to hold Circuit Request func Breaker(c Circuit, failureThreshold uint32) Circuit { //闭包内部的全局计数器 和状态标志 - cnt := counters{} + cnt := simpleCounter{} //ctx can be used hold parameters return func(ctx context.Context) error { + //阻止请求 if cnt.ConsecutiveFailures >= failureThreshold { if !canRetry(cnt, failureThreshold) { // Fails fast instead of propagating requests to the circuit since // not enough time has passed since the last failure to retry return ErrServiceUnavailable } + //reset mark for failures + cnt.ConsecutiveFailures = 0 } + // Unless the failure threshold is exceeded the wrapped service mimics the // old behavior and the difference in behavior is seen after consecutive failures if err := c(ctx); err != nil { + //连续失败会增大backoff 时间 cnt.Count(FailureState) return err }