From 44115024e2dcc995afb534be4a688b8019e30084 Mon Sep 17 00:00:00 2001 From: paulaan Date: Mon, 6 May 2019 21:43:42 +0700 Subject: [PATCH] [fanout] Optimize fanout --- messaging/fanout.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/messaging/fanout.go b/messaging/fanout.go index 6bbd2be..3d56805 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -22,21 +22,21 @@ type Pipeline struct { chain chan interface{} } -func (p *Pipeline) Start() { +func (p *Pipeline) Start(ctx context.Context) { go func(pipe *Pipeline) { for { - expectationWorkers := len(pipe.chain) - if expectationWorkers > MaxWorkers { - expectationWorkers = expectationWorkers % MaxWorkers + expectationWorkers := len(pipe.chain) % MaxWorkers + if expectationWorkers >= MaxWorkers { + expectationWorkers = 0 } - for _, c := range pipe.workers { - if expectationWorkers < int(c.index) { - break - } - select { - case val := <-pipe.chain: - go c.stream(val) + select { + case <-ctx.Done(): + return + case val, ok := <-pipe.chain: + if !ok { + return } + go pipe.workers[expectationWorkers].stream(val) } } }(p)