finish the simple circuit_breaker

This commit is contained in:
Edward 2020-05-22 16:50:41 +08:00
parent c2491af3d4
commit c8bd5eed55
4 changed files with 55 additions and 12 deletions

View File

@ -3,7 +3,7 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-10 22:00:58 * @Date: 2020-05-10 22:00:58
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-22 14:22:42 * @Last Modified time: 2020-05-22 16:44:57
*/ */
package circuit package circuit
@ -34,7 +34,8 @@ const (
) )
//////////////////////////////// ////////////////////////////////
//way 1 对象式断路器 //way 2 对象式断路器
// 高级模式
//////////////////////////////// ////////////////////////////////
//RequestBreaker for protection //RequestBreaker for protection
@ -80,6 +81,8 @@ func (rb *RequestBreaker) Do(work func() (interface{}, error)) (interface{}, err
//before //before
fmt.Println("before do : request:", rb.counts.Total()) fmt.Println("before do : request:", rb.counts.Total())
//handle status
//do work from requested user //do work from requested user
result, err := work() result, err := work()

View File

@ -3,7 +3,7 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-11 10:55:28 * @Date: 2020-05-11 10:55:28
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-21 14:08:53 * @Last Modified time: 2020-05-22 16:37:21
*/ */
package circuit package circuit
@ -12,13 +12,14 @@ import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log"
"net/http" "net/http"
"testing" "testing"
) )
var breaker *RequestBreaker var breaker *RequestBreaker
func TestBasicBreaker(t *testing.T) { func TestObjectBreaker(t *testing.T) {
jobToDo := func() (interface{}, error) { jobToDo := func() (interface{}, error) {
resp, err := http.Get("https://bing.com/robots.txt") resp, err := http.Get("https://bing.com/robots.txt")
@ -72,6 +73,8 @@ func TestFunctionalBreaker(t *testing.T) {
params := context.TODO() params := context.TODO()
// do the job as usually // do the job as usually
circuitWork(params) res := circuitWork(params)
log.Print("\nresult:", res)
} }

View File

@ -3,7 +3,7 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-22 12:41:54 * @Date: 2020-05-22 12:41:54
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-22 14:21:00 * @Last Modified time: 2020-05-22 16:47:37
*/ */
package circuit package circuit
@ -42,7 +42,6 @@ type counters struct {
Requests uint32 //连续的请求次数 Requests uint32 //连续的请求次数
lastState OperationState lastState OperationState
lastActivity time.Time lastActivity time.Time
counts uint32 //counts of failures
TotalFailures uint32 TotalFailures uint32
TotalSuccesses uint32 TotalSuccesses uint32
ConsecutiveSuccesses uint32 ConsecutiveSuccesses uint32

View File

@ -3,7 +3,7 @@
* @Author: Edward * @Author: Edward
* @Date: 2020-05-22 12:42:34 * @Date: 2020-05-22 12:42:34
* @Last Modified by: Edward * @Last Modified by: Edward
* @Last Modified time: 2020-05-22 14:35:00 * @Last Modified time: 2020-05-22 16:48:56
*/ */
package circuit package circuit
@ -16,7 +16,7 @@ import (
//////////////////////////////// ////////////////////////////////
//way 2 简单的函数式断路器 //way 2 简单的函数式断路器
// 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问 // 一个func实例作为一个Breaker 允许多个worker(即goroutine)同时访问
// 理论上讲也需要考虑同步问题 // 当前简单场景下只考虑单个worker情况下的连续请求
// 当前的设计思路,非常简单: // 当前的设计思路,非常简单:
// 1、不考虑三种状态之间的转换只靠两种状态电路打开与电路关闭 // 1、不考虑三种状态之间的转换只靠两种状态电路打开与电路关闭
// 2、当累计失败到达一定失败次数就端开请求(打开电路),并延迟一定的时间再允许请求 // 2、当累计失败到达一定失败次数就端开请求(打开电路),并延迟一定的时间再允许请求
@ -30,11 +30,44 @@ const (
SuccessState SuccessState
) )
type simpleCounter struct {
lastState OperationState
lastActivity time.Time
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *simpleCounter) LastActivity() time.Time {
return c.lastActivity
}
func (c *simpleCounter) Reset() {
ct := &simpleCounter{}
ct.lastActivity = c.lastActivity
ct.lastState = UnknownState
c = ct
}
//Count the failure and success
func (c *simpleCounter) Count(statue OperationState) {
switch statue {
case FailureState:
c.ConsecutiveFailures++
case SuccessState:
c.ConsecutiveSuccesses++
}
c.lastActivity = time.Now() //更新活动时间
c.lastState = statue
//handle status change
}
//Circuit of action stream,this is actually to do something. //Circuit of action stream,this is actually to do something.
//Circuit hold the really action //Circuit hold the really action
type Circuit func(context.Context) error type Circuit func(context.Context) error
var canRetry = func(cnt counters, failureThreshold uint32) bool { //失败达到阈值后,过两秒重试
var canRetry = func(cnt simpleCounter, failureThreshold uint32) bool {
backoffLevel := cnt.ConsecutiveFailures - failureThreshold backoffLevel := cnt.ConsecutiveFailures - failureThreshold
// Calculates when should the circuit breaker resume propagating requests // Calculates when should the circuit breaker resume propagating requests
// to the service // to the service
@ -42,25 +75,30 @@ var canRetry = func(cnt counters, failureThreshold uint32) bool {
return time.Now().After(shouldRetryAt) return time.Now().After(shouldRetryAt)
} }
//Breaker return a closure wrapper to hold request //Breaker return a closure wrapper to hold Circuit Request
func Breaker(c Circuit, failureThreshold uint32) Circuit { func Breaker(c Circuit, failureThreshold uint32) Circuit {
//闭包内部的全局计数器 和状态标志 //闭包内部的全局计数器 和状态标志
cnt := counters{} cnt := simpleCounter{}
//ctx can be used hold parameters //ctx can be used hold parameters
return func(ctx context.Context) error { return func(ctx context.Context) error {
//阻止请求
if cnt.ConsecutiveFailures >= failureThreshold { if cnt.ConsecutiveFailures >= failureThreshold {
if !canRetry(cnt, failureThreshold) { if !canRetry(cnt, failureThreshold) {
// Fails fast instead of propagating requests to the circuit since // Fails fast instead of propagating requests to the circuit since
// not enough time has passed since the last failure to retry // not enough time has passed since the last failure to retry
return ErrServiceUnavailable return ErrServiceUnavailable
} }
//reset mark for failures
cnt.ConsecutiveFailures = 0
} }
// Unless the failure threshold is exceeded the wrapped service mimics the // Unless the failure threshold is exceeded the wrapped service mimics the
// old behavior and the difference in behavior is seen after consecutive failures // old behavior and the difference in behavior is seen after consecutive failures
if err := c(ctx); err != nil { if err := c(ctx); err != nil {
//连续失败会增大backoff 时间
cnt.Count(FailureState) cnt.Count(FailureState)
return err return err
} }