1
0
mirror of https://github.com/tmrts/go-patterns.git synced 2024-11-25 06:26:06 +03:00

Producer Consumer Part

Like serverAccpet, create routine for consumer if there is available
consumer.
This commit is contained in:
Wei Fu 2017-03-30 21:21:58 +08:00
parent abba91a6ac
commit c1b89ad941
2 changed files with 16 additions and 14 deletions

View File

@ -15,7 +15,7 @@ const (
TimeoutDial = 5 * time.Second
TimeoutShutdown = 5 * time.Second
ProcessDuration = 2 * time.Second
ProcessDuration = 3 * time.Second
)
var (

View File

@ -54,19 +54,21 @@ func (p *Producer) schedule() {
// one consumer consumes one job at one time
consumer := <-p.consumers
// if rpcCall fails, this consumer will be regarded as unavailable.
err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{})
if err != nil {
LogError.Printf("[%s] %s", p.address, err.Error())
} else { // re-register consumer
go func(consumer string) {
select {
case p.consumers <- consumer:
case <-time.After(5 * time.Second):
LogError.Printf("[%s] %s", p.address, ErrorTRegister)
}
}(consumer)
}
go func(consumer string) {
// if rpcCall fails, this consumer will be regarded as unavailable.
err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{})
if err != nil {
LogError.Printf("[%s] %s", p.address, err.Error())
} else { // re-register consumer
go func(consumer string) {
select {
case p.consumers <- consumer:
case <-time.After(5 * time.Second):
LogError.Printf("[%s] %s", p.address, ErrorTRegister)
}
}(consumer)
}
}(consumer)
case <-p.done:
break
}