[fanout] Organize fan-out pipeline

This commit is contained in:
paulaan 2019-05-02 17:27:28 +07:00
parent 42e2301e29
commit c033afdbea

View File

@ -8,45 +8,39 @@ import (
"sync/atomic" "sync/atomic"
) )
const (
MaxWorkers = 32
MaxQueueSize = 128
)
var ( var (
log, _ = zap.NewDevelopment() log, _ = zap.NewDevelopment()
) )
const (
MaxWorkers = 16
MaxQueueSize = 512
MasterQueueSize = MaxQueueSize * MaxWorkers
)
type Pipeline struct { type Pipeline struct {
workers []*worker workers map[int]*worker
chain chan interface{} chain chan interface{}
running *uint32
} }
func (p *Pipeline) Start() { func (p *Pipeline) Start() {
distributeToChannels := func(ch chan interface{}, cs []*worker) { go func(pipe *Pipeline) {
writer := cs[0] //first worker must stream as default
for { for {
for _, c := range cs { expectationWorkers := len(pipe.chain)
expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 if expectationWorkers > MaxWorkers {
runningWorker := atomic.LoadUint32(p.running) expectationWorkers = expectationWorkers % MaxWorkers
if c.index > runningWorker+1 || c.index > expectationWorkers { }
for _, c := range pipe.workers {
if expectationWorkers < int(c.index) {
break break
} }
select { select {
case val := <-ch: case val := <-pipe.chain:
writer = c go c.stream(val)
if c.debug {
log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers))
}
go writer.stream(val)
} }
} }
} }
} }(p)
}
go distributeToChannels(p.chain, p.workers)
} }
func (p *Pipeline) Dispatch(msg interface{}) { func (p *Pipeline) Dispatch(msg interface{}) {
@ -56,23 +50,19 @@ func (p *Pipeline) Dispatch(msg interface{}) {
type DispatcherBuilder func() Dispatcher type DispatcherBuilder func() Dispatcher
func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline {
wk := make([]*worker, 0, MaxWorkers) ch := make(chan interface{}, MasterQueueSize)
ch := make(chan interface{}, 4096) wk := make(map[int]*worker)
pipe := &Pipeline{chain: ch, running: new(uint32)}
for i := 0; i < MaxWorkers; i++ { for i := 0; i < MaxWorkers; i++ {
wk = append(wk, wk[i] = &worker{
&worker{
index: uint32(i + 1), index: uint32(i + 1),
chain: make(chan interface{}, MaxQueueSize), chain: make(chan interface{}, MaxQueueSize),
mutex: new(sync.Mutex), mutex: new(sync.Mutex),
debug: debug, debug: debug,
idle: idle, idle: idle,
Dispatcher: d(), Dispatcher: d(),
broker: pipe.running,
})
} }
pipe.workers = wk }
return pipe return &Pipeline{workers: wk, chain: ch}
} }
type Dispatcher interface { type Dispatcher interface {
@ -88,7 +78,6 @@ type worker struct {
chain chan interface{} chain chan interface{}
debug bool debug bool
idle uint32 idle uint32
broker *uint32
Dispatcher Dispatcher
} }
@ -97,14 +86,7 @@ func (c *worker) stream(val interface{}) {
if !c.running { if !c.running {
c.mutex.Lock() c.mutex.Lock()
c.running = true c.running = true
atomic.AddUint32(c.broker, 1)
defer atomic.AddUint32(c.broker, ^uint32(1 - 1))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
err := c.Before(ctx)
if err != nil {
log.Error("can not start worker", zap.Error(err))
}
defer func(w *worker, cancel context.CancelFunc) { defer func(w *worker, cancel context.CancelFunc) {
if w.debug { if w.debug {
log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle))
@ -117,6 +99,11 @@ func (c *worker) stream(val interface{}) {
w.mutex.Unlock() w.mutex.Unlock()
w.running = false w.running = false
}(c, cancel) }(c, cancel)
err := c.Before(ctx)
if err != nil {
log.Error("can not start worker", zap.Error(err))
}
var idle uint32 = 0 var idle uint32 = 0
for { for {
select { select {
@ -140,9 +127,6 @@ func (c *worker) stream(val interface{}) {
if i > c.idle { if i > c.idle {
return return
} }
if c.debug {
log.Info("Idle", zap.Any("worker index", c.index), zap.Any("idle", idle))
}
} }
} }
} }