add a breaker patten

This commit is contained in:
Edward 2020-05-03 10:54:01 +08:00
parent ed3cbab384
commit eabae2e9b7
2 changed files with 357 additions and 0 deletions

View File

@ -0,0 +1,161 @@
// Package breaker implements the circuit-breaker resiliency pattern for Go.
package breaker
import (
"errors"
"sync"
"sync/atomic"
"time"
)
// ErrBreakerOpen is the error returned from Run() when the function is not executed
// because the breaker is currently open.
var ErrBreakerOpen = errors.New("circuit breaker is open")
const (
closed uint32 = iota
open
halfOpen
)
// Breaker implements the circuit-breaker resiliency pattern
type Breaker struct {
errorThreshold, successThreshold int
timeout time.Duration
lock sync.Mutex
state uint32
errors, successes int
lastError time.Time
}
// New constructs a new circuit-breaker that starts closed.
// From closed, the breaker opens if "errorThreshold" errors are seen
// without an error-free period of at least "timeout". From open, the
// breaker half-closes after "timeout". From half-open, the breaker closes
// after "successThreshold" consecutive successes, or opens on a single error.
func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
return &Breaker{
errorThreshold: errorThreshold,
successThreshold: successThreshold,
timeout: timeout,
}
}
// Run will either return ErrBreakerOpen immediately if the circuit-breaker is
// already open, or it will run the given function and pass along its return
// value. It is safe to call Run concurrently on the same Breaker.
func (b *Breaker) Run(work func() error) error {
state := atomic.LoadUint32(&b.state)
if state == open {
return ErrBreakerOpen
}
return b.doWork(state, work)
}
// Go will either return ErrBreakerOpen immediately if the circuit-breaker is
// already open, or it will run the given function in a separate goroutine.
// If the function is run, Go will return nil immediately, and will *not* return
// the return value of the function. It is safe to call Go concurrently on the
// same Breaker.
func (b *Breaker) Go(work func() error) error {
state := atomic.LoadUint32(&b.state)
if state == open {
return ErrBreakerOpen
}
// errcheck complains about ignoring the error return value, but
// that's on purpose; if you want an error from a goroutine you have to
// get it over a channel or something
go b.doWork(state, work)
return nil
}
func (b *Breaker) doWork(state uint32, work func() error) error {
var panicValue interface{}
result := func() error {
defer func() {
panicValue = recover()
}()
return work()
}()
if result == nil && panicValue == nil && state == closed {
// short-circuit the normal, success path without contending
// on the lock
return nil
}
// oh well, I guess we have to contend on the lock
b.processResult(result, panicValue)
if panicValue != nil {
// as close as Go lets us come to a "rethrow" although unfortunately
// we lose the original panicing location
panic(panicValue)
}
return result
}
func (b *Breaker) processResult(result error, panicValue interface{}) {
b.lock.Lock()
defer b.lock.Unlock()
if result == nil && panicValue == nil {
if b.state == halfOpen {
b.successes++
if b.successes == b.successThreshold {
b.closeBreaker()
}
}
} else {
if b.errors > 0 {
expiry := b.lastError.Add(b.timeout)
if time.Now().After(expiry) {
b.errors = 0
}
}
switch b.state {
case closed:
b.errors++
if b.errors == b.errorThreshold {
b.openBreaker()
} else {
b.lastError = time.Now()
}
case halfOpen:
b.openBreaker()
}
}
}
func (b *Breaker) openBreaker() {
b.changeState(open)
go b.timer()
}
func (b *Breaker) closeBreaker() {
b.changeState(closed)
}
func (b *Breaker) timer() {
time.Sleep(b.timeout)
b.lock.Lock()
defer b.lock.Unlock()
b.changeState(halfOpen)
}
func (b *Breaker) changeState(newState uint32) {
b.errors = 0
b.successes = 0
atomic.StoreUint32(&b.state, newState)
}

View File

@ -0,0 +1,196 @@
package breaker
import (
"errors"
"testing"
"time"
)
var errSomeError = errors.New("errSomeError")
func alwaysPanics() error {
panic("foo")
}
func returnsError() error {
return errSomeError
}
func returnsSuccess() error {
return nil
}
func TestBreakerErrorExpiry(t *testing.T) {
breaker := New(2, 1, 1*time.Second)
for i := 0; i < 3; i++ {
if err := breaker.Run(returnsError); err != errSomeError {
t.Error(err)
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 3; i++ {
if err := breaker.Go(returnsError); err != nil {
t.Error(err)
}
time.Sleep(1 * time.Second)
}
}
func TestBreakerPanicsCountAsErrors(t *testing.T) {
breaker := New(3, 2, 1*time.Second)
// three errors opens the breaker
for i := 0; i < 3; i++ {
func() {
defer func() {
val := recover()
if val.(string) != "foo" {
t.Error("incorrect panic")
}
}()
if err := breaker.Run(alwaysPanics); err != nil {
t.Error(err)
}
t.Error("shouldn't get here")
}()
}
// breaker is open
for i := 0; i < 5; i++ {
if err := breaker.Run(returnsError); err != ErrBreakerOpen {
t.Error(err)
}
}
}
func TestBreakerStateTransitions(t *testing.T) {
breaker := New(3, 2, 1*time.Second)
// three errors opens the breaker
for i := 0; i < 3; i++ {
if err := breaker.Run(returnsError); err != errSomeError {
t.Error(err)
}
}
// breaker is open
for i := 0; i < 5; i++ {
if err := breaker.Run(returnsError); err != ErrBreakerOpen {
t.Error(err)
}
}
// wait for it to half-close
time.Sleep(2 * time.Second)
// one success works, but is not enough to fully close
if err := breaker.Run(returnsSuccess); err != nil {
t.Error(err)
}
// error works, but re-opens immediately
if err := breaker.Run(returnsError); err != errSomeError {
t.Error(err)
}
// breaker is open
if err := breaker.Run(returnsError); err != ErrBreakerOpen {
t.Error(err)
}
// wait for it to half-close
time.Sleep(2 * time.Second)
// two successes is enough to close it for good
for i := 0; i < 2; i++ {
if err := breaker.Run(returnsSuccess); err != nil {
t.Error(err)
}
}
// error works
if err := breaker.Run(returnsError); err != errSomeError {
t.Error(err)
}
// breaker is still closed
if err := breaker.Run(returnsSuccess); err != nil {
t.Error(err)
}
}
func TestBreakerAsyncStateTransitions(t *testing.T) {
breaker := New(3, 2, 1*time.Second)
// three errors opens the breaker
for i := 0; i < 3; i++ {
if err := breaker.Go(returnsError); err != nil {
t.Error(err)
}
}
// just enough to yield the scheduler and let the goroutines work off
time.Sleep(1 * time.Millisecond)
// breaker is open
for i := 0; i < 5; i++ {
if err := breaker.Go(returnsError); err != ErrBreakerOpen {
t.Error(err)
}
}
// wait for it to half-close
time.Sleep(2 * time.Second)
// one success works, but is not enough to fully close
if err := breaker.Go(returnsSuccess); err != nil {
t.Error(err)
}
// error works, but re-opens immediately
if err := breaker.Go(returnsError); err != nil {
t.Error(err)
}
// just enough to yield the scheduler and let the goroutines work off
time.Sleep(1 * time.Millisecond)
// breaker is open
if err := breaker.Go(returnsError); err != ErrBreakerOpen {
t.Error(err)
}
// wait for it to half-close
time.Sleep(2 * time.Second)
// two successes is enough to close it for good
for i := 0; i < 2; i++ {
if err := breaker.Go(returnsSuccess); err != nil {
t.Error(err)
}
}
// just enough to yield the scheduler and let the goroutines work off
time.Sleep(1 * time.Millisecond)
// error works
if err := breaker.Go(returnsError); err != nil {
t.Error(err)
}
// just enough to yield the scheduler and let the goroutines work off
time.Sleep(1 * time.Millisecond)
// breaker is still closed
if err := breaker.Go(returnsSuccess); err != nil {
t.Error(err)
}
}
func ExampleBreaker() {
breaker := New(3, 1, 5*time.Second)
for {
result := breaker.Run(func() error {
// communicate with some external service and
// return an error if the communication failed
return nil
})
switch result {
case nil:
// success!
case ErrBreakerOpen:
// our function wasn't run because the breaker was open
default:
// some other error
}
}
}