diff --git a/messaging/fanout.go b/messaging/fanout.go index e52785b..89e53d5 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -29,18 +29,20 @@ func (p *Pipeline) Start() { for { for _, c := range cs { expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + runningWorker := atomic.LoadUint32(&running) + if c.index > runningWorker+1 || c.index > expectationWorkers { + break + } select { case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } + writer = c if c.debug { log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) } go writer.stream(val) } } + } } }