diff --git a/gomore/batcher/README.md b/gomore/batcher/README.md new file mode 100644 index 0000000..60e3b9f --- /dev/null +++ b/gomore/batcher/README.md @@ -0,0 +1,31 @@ +batcher +======= + +[![Build Status](https://travis-ci.org/eapache/go-resiliency.svg?branch=master)](https://travis-ci.org/eapache/go-resiliency) +[![GoDoc](https://godoc.org/github.com/eapache/go-resiliency/batcher?status.svg)](https://godoc.org/github.com/eapache/go-resiliency/batcher) +[![Code of Conduct](https://img.shields.io/badge/code%20of%20conduct-active-blue.svg)](https://eapache.github.io/conduct.html) + +The batching resiliency pattern for golang. + +Creating a batcher takes two parameters: +- the timeout to wait while collecting a batch +- the function to run once a batch has been collected + +You can also optionally set a prefilter to fail queries before they enter the +batch. + +```go +b := batcher.New(10*time.Millisecond, func(params []interface{}) error { + // do something with the batch of parameters + return nil +}) + +b.Prefilter(func(param interface{}) error { + // do some sort of sanity check on the parameter, and return an error if it fails + return nil +}) + +for i := 0; i < 10; i++ { + go b.Run(i) +} +``` diff --git a/gomore/batcher/batcher.go b/gomore/batcher/batcher.go new file mode 100644 index 0000000..2d1ccb4 --- /dev/null +++ b/gomore/batcher/batcher.go @@ -0,0 +1,108 @@ +// Package batcher implements the batching resiliency pattern for Go. +package batcher + +import ( + "sync" + "time" +) + +type work struct { + param interface{} + future chan error +} + +// Batcher implements the batching resiliency pattern +type Batcher struct { + timeout time.Duration + prefilter func(interface{}) error + + lock sync.Mutex + submit chan *work + doWork func([]interface{}) error +} + +// New constructs a new batcher that will batch all calls to Run that occur within +// `timeout` time before calling doWork just once for the entire batch. The doWork +// function must be safe to run concurrently with itself as this may occur, especially +// when the timeout is small. +func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher { + return &Batcher{ + timeout: timeout, + doWork: doWork, + } +} + +// Run runs the work function with the given parameter, possibly +// including it in a batch with other calls to Run that occur within the +// specified timeout. It is safe to call Run concurrently on the same batcher. +func (b *Batcher) Run(param interface{}) error { + if b.prefilter != nil { + if err := b.prefilter(param); err != nil { + return err + } + } + + if b.timeout == 0 { + return b.doWork([]interface{}{param}) + } + + w := &work{ + param: param, + future: make(chan error, 1), + } + + b.submitWork(w) + + return <-w.future +} + +// Prefilter specifies an optional function that can be used to run initial checks on parameters +// passed to Run before being added to the batch. If the prefilter returns a non-nil error, +// that error is returned immediately from Run and the batcher is not invoked. A prefilter +// cannot safely be specified for a batcher if Run has already been invoked. The filter function +// specified must be concurrency-safe. +func (b *Batcher) Prefilter(filter func(interface{}) error) { + b.prefilter = filter +} + +func (b *Batcher) submitWork(w *work) { + b.lock.Lock() + defer b.lock.Unlock() + + if b.submit == nil { + b.submit = make(chan *work, 4) + go b.batch() + } + + b.submit <- w +} + +func (b *Batcher) batch() { + var params []interface{} + var futures []chan error + input := b.submit + + go b.timer() + + for work := range input { + params = append(params, work.param) + futures = append(futures, work.future) + } + + ret := b.doWork(params) + + for _, future := range futures { + future <- ret + close(future) + } +} + +func (b *Batcher) timer() { + time.Sleep(b.timeout) + + b.lock.Lock() + defer b.lock.Unlock() + + close(b.submit) + b.submit = nil +} diff --git a/gomore/batcher/batcher_test.go b/gomore/batcher/batcher_test.go new file mode 100644 index 0000000..f1b8d40 --- /dev/null +++ b/gomore/batcher/batcher_test.go @@ -0,0 +1,123 @@ +package batcher + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" +) + +var errSomeError = errors.New("errSomeError") + +func returnsError(params []interface{}) error { + return errSomeError +} + +func returnsSuccess(params []interface{}) error { + return nil +} + +func TestBatcherSuccess(t *testing.T) { + b := New(10*time.Millisecond, returnsSuccess) + + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + if err := b.Run(nil); err != nil { + t.Error(err) + } + wg.Done() + }() + } + wg.Wait() + + b = New(0, returnsSuccess) + for i := 0; i < 10; i++ { + if err := b.Run(nil); err != nil { + t.Error(err) + } + } +} + +func TestBatcherError(t *testing.T) { + b := New(10*time.Millisecond, returnsError) + + wg := &sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + if err := b.Run(nil); err != errSomeError { + t.Error(err) + } + wg.Done() + }() + } + wg.Wait() +} + +func TestBatcherPrefilter(t *testing.T) { + b := New(1*time.Millisecond, returnsSuccess) + + b.Prefilter(func(param interface{}) error { + if param == nil { + return errSomeError + } + return nil + }) + + if err := b.Run(nil); err != errSomeError { + t.Error(err) + } + + if err := b.Run(1); err != nil { + t.Error(err) + } +} + +func TestBatcherMultipleBatches(t *testing.T) { + var iters uint32 + + b := New(10*time.Millisecond, func(params []interface{}) error { + atomic.AddUint32(&iters, 1) + return nil + }) + + wg := &sync.WaitGroup{} + + for group := 0; group < 5; group++ { + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + if err := b.Run(nil); err != nil { + t.Error(err) + } + wg.Done() + }() + } + time.Sleep(15 * time.Millisecond) + } + + wg.Wait() + + if iters != 5 { + t.Error("Wrong number of iters:", iters) + } +} + +func ExampleBatcher() { + b := New(10*time.Millisecond, func(params []interface{}) error { + // do something with the batch of parameters + return nil + }) + + b.Prefilter(func(param interface{}) error { + // do some sort of sanity check on the parameter, and return an error if it fails + return nil + }) + + for i := 0; i < 10; i++ { + go b.Run(i) + } +}