add state handler proc

This commit is contained in:
Edward 2020-06-02 23:55:59 +08:00
parent 85d2ac3d20
commit aecf5d10fd

View File

@ -46,7 +46,7 @@ type RequestBreaker struct {
mutex sync.Mutex mutex sync.Mutex
state State state State
generation uint64 generation uint64
cnter ICounter cnter counters
} }
//NewRequestBreaker return a breaker //NewRequestBreaker return a breaker
@ -69,7 +69,7 @@ func NewRequestBreaker(opts ...Option) *RequestBreaker {
return &RequestBreaker{ return &RequestBreaker{
options: defaultOptions, options: defaultOptions,
cnter: &counters{}, cnter: counters{},
generation: 0, generation: 0,
} }
} }
@ -80,6 +80,8 @@ func NewRequestBreaker(opts ...Option) *RequestBreaker {
// If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again. // If a panic occurs in the request, the RequestBreaker handles it as an error and causes the same panic again.
func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)) (interface{}, error) { func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)) (interface{}, error) {
preState := StateUnknown
//before //before
fmt.Println("before do : request:", rb.cnter.Total()) fmt.Println("before do : request:", rb.cnter.Total())
rb.mutex.Lock() rb.mutex.Lock()
@ -87,22 +89,14 @@ func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)
case StateOpen: case StateOpen:
return nil, ErrTooManyRequests return nil, ErrTooManyRequests
case StateHalfOpen: case StateHalfOpen:
//do work from requested user //如果是断开状态,并且超时了,转到半开状态,记录
// result, err := work(rb.options.Ctx)
// if err != nil {
// rb.cnter.Count(FailureState)
// } else {
// rb.cnter.Count(SuccessState)
// return result, nil
// }
if rb.options.Expiry.Before(time.Now()) { if rb.options.Expiry.Before(time.Now()) {
rb.state = StateHalfOpen rb.state = StateHalfOpen
preState = rb.state
rb.cnter.Reset() rb.cnter.Reset()
rb.options.OnStateChanged(rb.options.Name, StateOpen, StateHalfOpen)
} }
case StateClosed:
} }
rb.mutex.Unlock() rb.mutex.Unlock()
@ -111,12 +105,29 @@ func (rb *RequestBreaker) Do(work func(ctx context.Context) (interface{}, error)
//do work from requested user //do work from requested user
result, err := work(rb.options.Ctx) result, err := work(rb.options.Ctx)
rb.mutex.Lock()
//失败了
if err != nil { if err != nil {
rb.cnter.Count(FailureState) rb.cnter.Count(FailureState)
//如果是在半开状态下的失败,立即打开开关
if rb.state == StateHalfOpen {
rb.state = StateOpen //转为打开
} else if rb.state == StateClosed {
if rb.options.WhenToBreak(rb.cnter) {
rb.state = StateOpen //打开开关
rb.cnter.Reset()
}
}
} else { } else {
rb.cnter.Count(SuccessState) rb.cnter.Count(SuccessState)
} if preState == StateOpen && rb.state == StateHalfOpen {
rb.options.OnStateChanged(rb.options.Name, StateOpen, StateHalfOpen)
}
}
rb.mutex.Unlock()
//after
fmt.Println("after do : request:", rb.cnter.Total()) fmt.Println("after do : request:", rb.cnter.Total())
return result, err return result, err