package breaker import ( "errors" "sync" "sync/atomic" "time" ) // ErrBreakerOpen is the error returned from Run() when the function is not executed // because the breaker is currently open. var ErrBreakerOpen = errors.New("circuit breaker is open") const ( closed uint32 = iota open halfOpen ) // Breaker implements the circuit-breaker resiliency pattern type Breaker struct { errorThreshold, successThreshold int timeout time.Duration lock sync.Mutex state uint32 errors, successes int lastError time.Time } // New constructs a new circuit-breaker that starts closed. // From closed, the breaker opens if "errorThreshold" errors are seen // without an error-free period of at least "timeout". From open, the // breaker half-closes after "timeout". From half-open, the breaker closes // after "successThreshold" consecutive successes, or opens on a single error. func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker { return &Breaker{ errorThreshold: errorThreshold, successThreshold: successThreshold, timeout: timeout, } } // Run will either return ErrBreakerOpen immediately if the circuit-breaker is // already open, or it will run the given function and pass along its return // value. It is safe to call Run concurrently on the same Breaker. func (b *Breaker) Run(work func() error) error { state := atomic.LoadUint32(&b.state) if state == open { return ErrBreakerOpen } return b.doWork(state, work) } // Go will either return ErrBreakerOpen immediately if the circuit-breaker is // already open, or it will run the given function in a separate goroutine. // If the function is run, Go will return nil immediately, and will *not* return // the return value of the function. It is safe to call Go concurrently on the // same Breaker. func (b *Breaker) Go(work func() error) error { state := atomic.LoadUint32(&b.state) if state == open { return ErrBreakerOpen } // errcheck complains about ignoring the error return value, but // that's on purpose; if you want an error from a goroutine you have to // get it over a channel or something go b.doWork(state, work) return nil } func (b *Breaker) doWork(state uint32, work func() error) error { var panicValue interface{} result := func() error { defer func() { panicValue = recover() }() return work() }() if result == nil && panicValue == nil && state == closed { // short-circuit the normal, success path without contending // on the lock return nil } // oh well, I guess we have to contend on the lock b.processResult(result, panicValue) if panicValue != nil { // as close as Go lets us come to a "rethrow" although unfortunately // we lose the original panicing location panic(panicValue) } return result } func (b *Breaker) processResult(result error, panicValue interface{}) { b.lock.Lock() defer b.lock.Unlock() if result == nil && panicValue == nil { if b.state == halfOpen { b.successes++ if b.successes == b.successThreshold { b.closeBreaker() } } } else { if b.errors > 0 { expiry := b.lastError.Add(b.timeout) if time.Now().After(expiry) { b.errors = 0 } } switch b.state { case closed: b.errors++ if b.errors == b.errorThreshold { b.openBreaker() } else { b.lastError = time.Now() } case halfOpen: b.openBreaker() } } } func (b *Breaker) openBreaker() { b.changeState(open) go b.timer() } func (b *Breaker) closeBreaker() { b.changeState(closed) } func (b *Breaker) timer() { time.Sleep(b.timeout) b.lock.Lock() defer b.lock.Unlock() b.changeState(halfOpen) } func (b *Breaker) changeState(newState uint32) { b.errors = 0 b.successes = 0 atomic.StoreUint32(&b.state, newState) }