[fanout] Break worker from higher index

This commit is contained in:
paulaan 2019-04-23 15:24:12 +07:00
parent ab12460b5d
commit 26549a1e88

View File

@ -29,18 +29,20 @@ func (p *Pipeline) Start() {
for { for {
for _, c := range cs { for _, c := range cs {
expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1
runningWorker := atomic.LoadUint32(&running)
if c.index > runningWorker+1 || c.index > expectationWorkers {
break
}
select { select {
case val := <-ch: case val := <-ch:
runningWorker := atomic.LoadUint32(&running) writer = c
if c.index <= runningWorker || c.index <= expectationWorkers {
writer = c
}
if c.debug { if c.debug {
log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers))
} }
go writer.stream(val) go writer.stream(val)
} }
} }
}
} }
} }