From 26549a1e8807e9a226c987d4e83e32ce6d692880 Mon Sep 17 00:00:00 2001 From: paulaan Date: Tue, 23 Apr 2019 15:24:12 +0700 Subject: [PATCH] [fanout] Break worker from higher index --- messaging/fanout.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index e52785b..89e53d5 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -29,18 +29,20 @@ func (p *Pipeline) Start() { for { for _, c := range cs { expectationWorkers := uint32(len(ch)/(MaxQueueSize/MaxWorkers)) + 1 + runningWorker := atomic.LoadUint32(&running) + if c.index > runningWorker+1 || c.index > expectationWorkers { + break + } select { case val := <-ch: - runningWorker := atomic.LoadUint32(&running) - if c.index <= runningWorker || c.index <= expectationWorkers { - writer = c - } + writer = c if c.debug { log.Info("Worker receiving", zap.Any("index", writer.index), zap.Any("running", runningWorker), zap.Any("no# workers", expectationWorkers)) } go writer.stream(val) } } + } } }