diff --git a/README.md b/README.md index 4d7dbf4..d8baa39 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ A curated collection of idiomatic design & application patterns for Go language. | [Generators](/concurrency/generator.md) | Yields a sequence of values one at a time | ✔ | | [Reactor](/concurrency/reactor.md) | Demultiplexes service requests delivered concurrently to a service handler and dispatches them syncronously to the associated request handlers | ✘ | | [Parallelism](/concurrency/parallelism.md) | Completes large number of independent tasks | ✔ | -| [Producer Consumer](/concurrency/producer_consumer.md) | Separates tasks from task executions | ✘ | +| [Producer Consumer](/concurrency/producer_consumer.md) | Separates tasks from task executions | ✔ | ## Messaging Patterns diff --git a/concurrency/producer_consumer.go b/concurrency/producer_consumer.go new file mode 100644 index 0000000..a68cb6e --- /dev/null +++ b/concurrency/producer_consumer.go @@ -0,0 +1,39 @@ +package main + +import ( + pc "./producer_consumer" + "strconv" + "time" +) + +func main() { + // unix socket address + pAddr := "/tmp/producer" + cAddr1 := "/tmp/consumer1" + cAddr2 := "/tmp/consumer2" + + // producer buffer size + bufSize := 2 + + // start producer + producer := pc.NewProducer(pAddr, bufSize) + pc.StartProducer(producer) + + // generate tasks + for i := 0; i < 10; i++ { + go func(num int) { + pc.EnqueueTask(pAddr, pc.Task{strconv.Itoa(num)}) + }(i) + } + + // start three consumers + consumer1 := pc.NewConsumer(cAddr1) + pc.StartConsumer(consumer1, pAddr) + consumer2 := pc.NewConsumer(cAddr2) + pc.StartConsumer(consumer2, pAddr) + + time.Sleep(30 * time.Second) + pc.Shutdown(pAddr, true) + pc.Shutdown(cAddr1, false) + pc.Shutdown(cAddr2, false) +} diff --git a/concurrency/producer_consumer.md b/concurrency/producer_consumer.md new file mode 100644 index 0000000..67ff109 --- /dev/null +++ b/concurrency/producer_consumer.md @@ -0,0 +1,12 @@ +# Producer Consumer + +What is the producer-consumer pattern? +> In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer. --- from [wikipedia](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem) + +## Implementaion + +More information can be found in [producer_consumer package](producer_consumer). + +## Example + +An example can be found in [producer_consumer.go](producer_consumer.go). diff --git a/concurrency/producer_consumer/common.go b/concurrency/producer_consumer/common.go new file mode 100644 index 0000000..4267274 --- /dev/null +++ b/concurrency/producer_consumer/common.go @@ -0,0 +1,67 @@ +package producer_consumer + +import ( + "errors" + "log" + "net" + "net/rpc" + "os" + "time" +) + +const ( + TimeoutEnqueueTask = 5 * time.Second + TimeoutRegister = 5 * time.Second + TimeoutDial = 5 * time.Second + TimeoutShutdown = 5 * time.Second + + ProcessDuration = 2 * time.Second +) + +var ( + ErrorTEnqueueTask = errors.New("Timeout for enqueuing task") + ErrorTRegister = errors.New("Timeout for registering") + ErrorTDial = errors.New("Timeout for dailing") + + LogInfo = log.New(os.Stdout, "[Info] ", log.Ltime) + LogError = log.New(os.Stderr, "[Error] ", log.Ltime) +) + +func Shutdown(address string, isProducer bool) { + serviceMethod := "Producer.Shutdown" + if !isProducer { + serviceMethod = "Consumer.Shutdown" + } + + err := rpcCall(address, serviceMethod, struct{}{}, &struct{}{}) + if err != nil { + LogError.Println(err) + } +} + +func rpcCall(address string, serviceMethod string, args interface{}, reply *struct{}) error { + conn, err := net.DialTimeout("unix", address, TimeoutDial) + if err != nil { + return err + } + + defer conn.Close() + + client := rpc.NewClient(conn) + err = client.Call(serviceMethod, args, reply) + if err != nil { + return err + } + return nil +} + +func serverAccept(server *rpc.Server, l net.Listener) { + for { + conn, err := l.Accept() + if err != nil { + LogError.Println(err) + return + } + go server.ServeConn(conn) + } +} diff --git a/concurrency/producer_consumer/consumer.go b/concurrency/producer_consumer/consumer.go new file mode 100644 index 0000000..8443279 --- /dev/null +++ b/concurrency/producer_consumer/consumer.go @@ -0,0 +1,55 @@ +package producer_consumer + +import ( + "net" + "net/rpc" + "os" + "time" +) + +type Consumer struct { + address string + + l net.Listener +} + +func NewConsumer(address string) *Consumer { + return &Consumer{address: address} +} + +func StartConsumer(c *Consumer, pAddr string) { + server := rpc.NewServer() + server.Register(c) + + os.Remove(c.address) + l, err := net.Listen("unix", c.address) + if err != nil { + LogError.Println(err) + } + + c.l = l + go serverAccept(server, c.l) + + err = rpcCall(pAddr, "Producer.Register", c.address, &struct{}{}) + if err != nil { + LogError.Println(err) + c.l.Close() + } +} + +func (c *Consumer) DoTask(task Task, _ *struct{}) error { + // takes time to do the task + time.Sleep(ProcessDuration) + + LogInfo.Printf("[%s] finish task[%s] successfully!", c.address, task.String()) + return nil +} + +func (c *Consumer) Shutdown(_ struct{}, _ *struct{}) error { + if err := c.l.Close(); err != nil { + return err + } + + LogInfo.Printf("[%s] shutdown successfully!", c.address) + return nil +} diff --git a/concurrency/producer_consumer/producer.go b/concurrency/producer_consumer/producer.go new file mode 100644 index 0000000..0192249 --- /dev/null +++ b/concurrency/producer_consumer/producer.go @@ -0,0 +1,106 @@ +package producer_consumer + +import ( + "net" + "net/rpc" + "os" + "time" +) + +type Producer struct { + address string + tasks chan Task + done chan struct{} + + consumers chan string + l net.Listener +} + +func NewProducer(address string, capacity int) *Producer { + return &Producer{ + address: address, + tasks: make(chan Task, capacity), + consumers: make(chan string, capacity), + done: make(chan struct{}, 0), + } +} + +func StartProducer(p *Producer) { + server := rpc.NewServer() + server.Register(p) + + os.Remove(p.address) + l, err := net.Listen("unix", p.address) + if err != nil { + LogError.Println(err) + } + + p.l = l + go serverAccept(server, p.l) + go p.schedule() +} + +func EnqueueTask(pAddr string, task Task) { + err := rpcCall(pAddr, "Producer.Enqueue", task, &struct{}{}) + if err != nil { + LogError.Println(err) + } +} + +func (p *Producer) schedule() { + for { + select { + case task := <-p.tasks: + // one consumer consumes one job at one time + consumer := <-p.consumers + + // if rpcCall fails, this consumer will be regarded as unavailable. + err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{}) + if err != nil { + LogError.Printf("[%s] %s", p.address, err.Error()) + } else { // re-register consumer + go func(consumer string) { + select { + case p.consumers <- consumer: + case <-time.After(5 * time.Second): + LogError.Printf("[%s] %s", p.address, ErrorTRegister) + } + }(consumer) + } + case <-p.done: + break + } + } +} + +func (p *Producer) Enqueue(task Task, _ *struct{}) error { + select { + case p.tasks <- task: + LogInfo.Printf("[%s] enqueue task[%s] successfully!", p.address, task.String()) + case <-time.After(TimeoutEnqueueTask): + return ErrorTEnqueueTask + } + + return nil +} + +func (p *Producer) Register(address string, _ *struct{}) error { + select { + case p.consumers <- address: + LogInfo.Printf("[%s] register consumer[%s] successfully!", p.address, address) + case <-time.After(TimeoutRegister): + return ErrorTRegister + } + + return nil +} + +func (p *Producer) Shutdown(_ struct{}, _ *struct{}) error { + p.done <- struct{}{} + if err := p.l.Close(); err != nil { + return err + } + + LogInfo.Printf("[%s] shutdown successfully!\n", p.address) + return nil +} diff --git a/concurrency/producer_consumer/task.go b/concurrency/producer_consumer/task.go new file mode 100644 index 0000000..4f6eb1f --- /dev/null +++ b/concurrency/producer_consumer/task.go @@ -0,0 +1,10 @@ +package producer_consumer + +// Add whatever you want +type Task struct { + Name string +} + +func (t *Task) String() string { + return t.Name +}