[fanout] Optimize fanout

This commit is contained in:
paulaan 2019-05-06 21:43:42 +07:00
parent 9c0c9709ff
commit 44115024e2

View File

@ -22,21 +22,21 @@ type Pipeline struct {
chain chan interface{} chain chan interface{}
} }
func (p *Pipeline) Start() { func (p *Pipeline) Start(ctx context.Context) {
go func(pipe *Pipeline) { go func(pipe *Pipeline) {
for { for {
expectationWorkers := len(pipe.chain) expectationWorkers := len(pipe.chain) % MaxWorkers
if expectationWorkers > MaxWorkers { if expectationWorkers >= MaxWorkers {
expectationWorkers = expectationWorkers % MaxWorkers expectationWorkers = 0
}
for _, c := range pipe.workers {
if expectationWorkers < int(c.index) {
break
} }
select { select {
case val := <-pipe.chain: case <-ctx.Done():
go c.stream(val) return
case val, ok := <-pipe.chain:
if !ok {
return
} }
go pipe.workers[expectationWorkers].stream(val)
} }
} }
}(p) }(p)