diff --git a/messaging/fanout.go b/messaging/fanout.go index 6bbd2be..3d56805 100644 --- a/messaging/fanout.go +++ b/messaging/fanout.go @@ -22,21 +22,21 @@ type Pipeline struct { chain chan interface{} } -func (p *Pipeline) Start() { +func (p *Pipeline) Start(ctx context.Context) { go func(pipe *Pipeline) { for { - expectationWorkers := len(pipe.chain) - if expectationWorkers > MaxWorkers { - expectationWorkers = expectationWorkers % MaxWorkers + expectationWorkers := len(pipe.chain) % MaxWorkers + if expectationWorkers >= MaxWorkers { + expectationWorkers = 0 } - for _, c := range pipe.workers { - if expectationWorkers < int(c.index) { - break - } - select { - case val := <-pipe.chain: - go c.stream(val) + select { + case <-ctx.Done(): + return + case val, ok := <-pipe.chain: + if !ok { + return } + go pipe.workers[expectationWorkers].stream(val) } } }(p)