diff --git a/concurrency/producer_consumer/common.go b/concurrency/producer_consumer/common.go index 4267274..75405de 100644 --- a/concurrency/producer_consumer/common.go +++ b/concurrency/producer_consumer/common.go @@ -15,7 +15,7 @@ const ( TimeoutDial = 5 * time.Second TimeoutShutdown = 5 * time.Second - ProcessDuration = 2 * time.Second + ProcessDuration = 3 * time.Second ) var ( diff --git a/concurrency/producer_consumer/producer.go b/concurrency/producer_consumer/producer.go index 0192249..0d72983 100644 --- a/concurrency/producer_consumer/producer.go +++ b/concurrency/producer_consumer/producer.go @@ -54,19 +54,21 @@ func (p *Producer) schedule() { // one consumer consumes one job at one time consumer := <-p.consumers - // if rpcCall fails, this consumer will be regarded as unavailable. - err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{}) - if err != nil { - LogError.Printf("[%s] %s", p.address, err.Error()) - } else { // re-register consumer - go func(consumer string) { - select { - case p.consumers <- consumer: - case <-time.After(5 * time.Second): - LogError.Printf("[%s] %s", p.address, ErrorTRegister) - } - }(consumer) - } + go func(consumer string) { + // if rpcCall fails, this consumer will be regarded as unavailable. + err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{}) + if err != nil { + LogError.Printf("[%s] %s", p.address, err.Error()) + } else { // re-register consumer + go func(consumer string) { + select { + case p.consumers <- consumer: + case <-time.After(5 * time.Second): + LogError.Printf("[%s] %s", p.address, ErrorTRegister) + } + }(consumer) + } + }(consumer) case <-p.done: break }