diff --git a/gomore/06_circuit_breaker/breaker/breaker.go b/gomore/06_circuit_breaker/breaker/breaker.go new file mode 100644 index 0000000..f88ca72 --- /dev/null +++ b/gomore/06_circuit_breaker/breaker/breaker.go @@ -0,0 +1,161 @@ +// Package breaker implements the circuit-breaker resiliency pattern for Go. +package breaker + +import ( + "errors" + "sync" + "sync/atomic" + "time" +) + +// ErrBreakerOpen is the error returned from Run() when the function is not executed +// because the breaker is currently open. +var ErrBreakerOpen = errors.New("circuit breaker is open") + +const ( + closed uint32 = iota + open + halfOpen +) + +// Breaker implements the circuit-breaker resiliency pattern +type Breaker struct { + errorThreshold, successThreshold int + timeout time.Duration + + lock sync.Mutex + state uint32 + errors, successes int + lastError time.Time +} + +// New constructs a new circuit-breaker that starts closed. +// From closed, the breaker opens if "errorThreshold" errors are seen +// without an error-free period of at least "timeout". From open, the +// breaker half-closes after "timeout". From half-open, the breaker closes +// after "successThreshold" consecutive successes, or opens on a single error. +func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker { + return &Breaker{ + errorThreshold: errorThreshold, + successThreshold: successThreshold, + timeout: timeout, + } +} + +// Run will either return ErrBreakerOpen immediately if the circuit-breaker is +// already open, or it will run the given function and pass along its return +// value. It is safe to call Run concurrently on the same Breaker. +func (b *Breaker) Run(work func() error) error { + state := atomic.LoadUint32(&b.state) + + if state == open { + return ErrBreakerOpen + } + + return b.doWork(state, work) +} + +// Go will either return ErrBreakerOpen immediately if the circuit-breaker is +// already open, or it will run the given function in a separate goroutine. +// If the function is run, Go will return nil immediately, and will *not* return +// the return value of the function. It is safe to call Go concurrently on the +// same Breaker. +func (b *Breaker) Go(work func() error) error { + state := atomic.LoadUint32(&b.state) + + if state == open { + return ErrBreakerOpen + } + + // errcheck complains about ignoring the error return value, but + // that's on purpose; if you want an error from a goroutine you have to + // get it over a channel or something + go b.doWork(state, work) + + return nil +} + +func (b *Breaker) doWork(state uint32, work func() error) error { + var panicValue interface{} + + result := func() error { + defer func() { + panicValue = recover() + }() + return work() + }() + + if result == nil && panicValue == nil && state == closed { + // short-circuit the normal, success path without contending + // on the lock + return nil + } + + // oh well, I guess we have to contend on the lock + b.processResult(result, panicValue) + + if panicValue != nil { + // as close as Go lets us come to a "rethrow" although unfortunately + // we lose the original panicing location + panic(panicValue) + } + + return result +} + +func (b *Breaker) processResult(result error, panicValue interface{}) { + b.lock.Lock() + defer b.lock.Unlock() + + if result == nil && panicValue == nil { + if b.state == halfOpen { + b.successes++ + if b.successes == b.successThreshold { + b.closeBreaker() + } + } + } else { + if b.errors > 0 { + expiry := b.lastError.Add(b.timeout) + if time.Now().After(expiry) { + b.errors = 0 + } + } + + switch b.state { + case closed: + b.errors++ + if b.errors == b.errorThreshold { + b.openBreaker() + } else { + b.lastError = time.Now() + } + case halfOpen: + b.openBreaker() + } + } +} + +func (b *Breaker) openBreaker() { + b.changeState(open) + go b.timer() +} + +func (b *Breaker) closeBreaker() { + b.changeState(closed) +} + +func (b *Breaker) timer() { + time.Sleep(b.timeout) + + b.lock.Lock() + defer b.lock.Unlock() + + b.changeState(halfOpen) +} + +func (b *Breaker) changeState(newState uint32) { + b.errors = 0 + b.successes = 0 + atomic.StoreUint32(&b.state, newState) +} diff --git a/gomore/06_circuit_breaker/breaker/breaker_test.go b/gomore/06_circuit_breaker/breaker/breaker_test.go new file mode 100644 index 0000000..b41308d --- /dev/null +++ b/gomore/06_circuit_breaker/breaker/breaker_test.go @@ -0,0 +1,196 @@ +package breaker + +import ( + "errors" + "testing" + "time" +) + +var errSomeError = errors.New("errSomeError") + +func alwaysPanics() error { + panic("foo") +} + +func returnsError() error { + return errSomeError +} + +func returnsSuccess() error { + return nil +} + +func TestBreakerErrorExpiry(t *testing.T) { + breaker := New(2, 1, 1*time.Second) + + for i := 0; i < 3; i++ { + if err := breaker.Run(returnsError); err != errSomeError { + t.Error(err) + } + time.Sleep(1 * time.Second) + } + + for i := 0; i < 3; i++ { + if err := breaker.Go(returnsError); err != nil { + t.Error(err) + } + time.Sleep(1 * time.Second) + } +} + +func TestBreakerPanicsCountAsErrors(t *testing.T) { + breaker := New(3, 2, 1*time.Second) + + // three errors opens the breaker + for i := 0; i < 3; i++ { + func() { + defer func() { + val := recover() + if val.(string) != "foo" { + t.Error("incorrect panic") + } + }() + if err := breaker.Run(alwaysPanics); err != nil { + t.Error(err) + } + t.Error("shouldn't get here") + }() + } + + // breaker is open + for i := 0; i < 5; i++ { + if err := breaker.Run(returnsError); err != ErrBreakerOpen { + t.Error(err) + } + } +} + +func TestBreakerStateTransitions(t *testing.T) { + breaker := New(3, 2, 1*time.Second) + + // three errors opens the breaker + for i := 0; i < 3; i++ { + if err := breaker.Run(returnsError); err != errSomeError { + t.Error(err) + } + } + + // breaker is open + for i := 0; i < 5; i++ { + if err := breaker.Run(returnsError); err != ErrBreakerOpen { + t.Error(err) + } + } + + // wait for it to half-close + time.Sleep(2 * time.Second) + // one success works, but is not enough to fully close + if err := breaker.Run(returnsSuccess); err != nil { + t.Error(err) + } + // error works, but re-opens immediately + if err := breaker.Run(returnsError); err != errSomeError { + t.Error(err) + } + // breaker is open + if err := breaker.Run(returnsError); err != ErrBreakerOpen { + t.Error(err) + } + + // wait for it to half-close + time.Sleep(2 * time.Second) + // two successes is enough to close it for good + for i := 0; i < 2; i++ { + if err := breaker.Run(returnsSuccess); err != nil { + t.Error(err) + } + } + // error works + if err := breaker.Run(returnsError); err != errSomeError { + t.Error(err) + } + // breaker is still closed + if err := breaker.Run(returnsSuccess); err != nil { + t.Error(err) + } +} + +func TestBreakerAsyncStateTransitions(t *testing.T) { + breaker := New(3, 2, 1*time.Second) + + // three errors opens the breaker + for i := 0; i < 3; i++ { + if err := breaker.Go(returnsError); err != nil { + t.Error(err) + } + } + + // just enough to yield the scheduler and let the goroutines work off + time.Sleep(1 * time.Millisecond) + + // breaker is open + for i := 0; i < 5; i++ { + if err := breaker.Go(returnsError); err != ErrBreakerOpen { + t.Error(err) + } + } + + // wait for it to half-close + time.Sleep(2 * time.Second) + // one success works, but is not enough to fully close + if err := breaker.Go(returnsSuccess); err != nil { + t.Error(err) + } + // error works, but re-opens immediately + if err := breaker.Go(returnsError); err != nil { + t.Error(err) + } + // just enough to yield the scheduler and let the goroutines work off + time.Sleep(1 * time.Millisecond) + // breaker is open + if err := breaker.Go(returnsError); err != ErrBreakerOpen { + t.Error(err) + } + + // wait for it to half-close + time.Sleep(2 * time.Second) + // two successes is enough to close it for good + for i := 0; i < 2; i++ { + if err := breaker.Go(returnsSuccess); err != nil { + t.Error(err) + } + } + // just enough to yield the scheduler and let the goroutines work off + time.Sleep(1 * time.Millisecond) + // error works + if err := breaker.Go(returnsError); err != nil { + t.Error(err) + } + // just enough to yield the scheduler and let the goroutines work off + time.Sleep(1 * time.Millisecond) + // breaker is still closed + if err := breaker.Go(returnsSuccess); err != nil { + t.Error(err) + } +} + +func ExampleBreaker() { + breaker := New(3, 1, 5*time.Second) + + for { + result := breaker.Run(func() error { + // communicate with some external service and + // return an error if the communication failed + return nil + }) + + switch result { + case nil: + // success! + case ErrBreakerOpen: + // our function wasn't run because the breaker was open + default: + // some other error + } + } +}