add batcher pattern basic code

This commit is contained in:
Edward 2020-05-03 10:52:55 +08:00
parent 76dfaf7a59
commit c3aebef2ff
3 changed files with 262 additions and 0 deletions

31
gomore/batcher/README.md Normal file
View File

@ -0,0 +1,31 @@
batcher
=======
[![Build Status](https://travis-ci.org/eapache/go-resiliency.svg?branch=master)](https://travis-ci.org/eapache/go-resiliency)
[![GoDoc](https://godoc.org/github.com/eapache/go-resiliency/batcher?status.svg)](https://godoc.org/github.com/eapache/go-resiliency/batcher)
[![Code of Conduct](https://img.shields.io/badge/code%20of%20conduct-active-blue.svg)](https://eapache.github.io/conduct.html)
The batching resiliency pattern for golang.
Creating a batcher takes two parameters:
- the timeout to wait while collecting a batch
- the function to run once a batch has been collected
You can also optionally set a prefilter to fail queries before they enter the
batch.
```go
b := batcher.New(10*time.Millisecond, func(params []interface{}) error {
// do something with the batch of parameters
return nil
})
b.Prefilter(func(param interface{}) error {
// do some sort of sanity check on the parameter, and return an error if it fails
return nil
})
for i := 0; i < 10; i++ {
go b.Run(i)
}
```

108
gomore/batcher/batcher.go Normal file
View File

@ -0,0 +1,108 @@
// Package batcher implements the batching resiliency pattern for Go.
package batcher
import (
"sync"
"time"
)
type work struct {
param interface{}
future chan error
}
// Batcher implements the batching resiliency pattern
type Batcher struct {
timeout time.Duration
prefilter func(interface{}) error
lock sync.Mutex
submit chan *work
doWork func([]interface{}) error
}
// New constructs a new batcher that will batch all calls to Run that occur within
// `timeout` time before calling doWork just once for the entire batch. The doWork
// function must be safe to run concurrently with itself as this may occur, especially
// when the timeout is small.
func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
return &Batcher{
timeout: timeout,
doWork: doWork,
}
}
// Run runs the work function with the given parameter, possibly
// including it in a batch with other calls to Run that occur within the
// specified timeout. It is safe to call Run concurrently on the same batcher.
func (b *Batcher) Run(param interface{}) error {
if b.prefilter != nil {
if err := b.prefilter(param); err != nil {
return err
}
}
if b.timeout == 0 {
return b.doWork([]interface{}{param})
}
w := &work{
param: param,
future: make(chan error, 1),
}
b.submitWork(w)
return <-w.future
}
// Prefilter specifies an optional function that can be used to run initial checks on parameters
// passed to Run before being added to the batch. If the prefilter returns a non-nil error,
// that error is returned immediately from Run and the batcher is not invoked. A prefilter
// cannot safely be specified for a batcher if Run has already been invoked. The filter function
// specified must be concurrency-safe.
func (b *Batcher) Prefilter(filter func(interface{}) error) {
b.prefilter = filter
}
func (b *Batcher) submitWork(w *work) {
b.lock.Lock()
defer b.lock.Unlock()
if b.submit == nil {
b.submit = make(chan *work, 4)
go b.batch()
}
b.submit <- w
}
func (b *Batcher) batch() {
var params []interface{}
var futures []chan error
input := b.submit
go b.timer()
for work := range input {
params = append(params, work.param)
futures = append(futures, work.future)
}
ret := b.doWork(params)
for _, future := range futures {
future <- ret
close(future)
}
}
func (b *Batcher) timer() {
time.Sleep(b.timeout)
b.lock.Lock()
defer b.lock.Unlock()
close(b.submit)
b.submit = nil
}

View File

@ -0,0 +1,123 @@
package batcher
import (
"errors"
"sync"
"sync/atomic"
"testing"
"time"
)
var errSomeError = errors.New("errSomeError")
func returnsError(params []interface{}) error {
return errSomeError
}
func returnsSuccess(params []interface{}) error {
return nil
}
func TestBatcherSuccess(t *testing.T) {
b := New(10*time.Millisecond, returnsSuccess)
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
if err := b.Run(nil); err != nil {
t.Error(err)
}
wg.Done()
}()
}
wg.Wait()
b = New(0, returnsSuccess)
for i := 0; i < 10; i++ {
if err := b.Run(nil); err != nil {
t.Error(err)
}
}
}
func TestBatcherError(t *testing.T) {
b := New(10*time.Millisecond, returnsError)
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
if err := b.Run(nil); err != errSomeError {
t.Error(err)
}
wg.Done()
}()
}
wg.Wait()
}
func TestBatcherPrefilter(t *testing.T) {
b := New(1*time.Millisecond, returnsSuccess)
b.Prefilter(func(param interface{}) error {
if param == nil {
return errSomeError
}
return nil
})
if err := b.Run(nil); err != errSomeError {
t.Error(err)
}
if err := b.Run(1); err != nil {
t.Error(err)
}
}
func TestBatcherMultipleBatches(t *testing.T) {
var iters uint32
b := New(10*time.Millisecond, func(params []interface{}) error {
atomic.AddUint32(&iters, 1)
return nil
})
wg := &sync.WaitGroup{}
for group := 0; group < 5; group++ {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
if err := b.Run(nil); err != nil {
t.Error(err)
}
wg.Done()
}()
}
time.Sleep(15 * time.Millisecond)
}
wg.Wait()
if iters != 5 {
t.Error("Wrong number of iters:", iters)
}
}
func ExampleBatcher() {
b := New(10*time.Millisecond, func(params []interface{}) error {
// do something with the batch of parameters
return nil
})
b.Prefilter(func(param interface{}) error {
// do some sort of sanity check on the parameter, and return an error if it fails
return nil
})
for i := 0; i < 10; i++ {
go b.Run(i)
}
}