1
0
mirror of https://github.com/tmrts/go-patterns.git synced 2025-02-20 14:23:13 +03:00

Add Producer Consumer Part

There are three parts changed:

1. Add producer_consumer.md and update the README.md in root dir.

2. Add producer_consumer package:

  We can use assign tasks to producer after producer starts.

  Producer has limit buffer for task. If the buffer is full and no
  consumer to take, the producer will ignore the task.

  Consumer needs to regiester if it wants to do task for producer.

  All the communication is based on RPC.

3. Add example for producer_consumer package.

  The example will show the case that the buffer of producer is full.
This commit is contained in:
Wei Fu 2017-03-30 16:30:19 +08:00
parent 399a51c7fe
commit 28cf6c4cfc
7 changed files with 290 additions and 1 deletions

View File

@ -68,7 +68,7 @@ A curated collection of idiomatic design & application patterns for Go language.
| [Generators](/concurrency/generator.md) | Yields a sequence of values one at a time | ✔ |
| [Reactor](/concurrency/reactor.md) | Demultiplexes service requests delivered concurrently to a service handler and dispatches them syncronously to the associated request handlers | ✘ |
| [Parallelism](/concurrency/parallelism.md) | Completes large number of independent tasks | ✔ |
| [Producer Consumer](/concurrency/producer_consumer.md) | Separates tasks from task executions | |
| [Producer Consumer](/concurrency/producer_consumer.md) | Separates tasks from task executions | |
## Messaging Patterns

View File

@ -0,0 +1,39 @@
package main
import (
pc "./producer_consumer"
"strconv"
"time"
)
func main() {
// unix socket address
pAddr := "/tmp/producer"
cAddr1 := "/tmp/consumer1"
cAddr2 := "/tmp/consumer2"
// producer buffer size
bufSize := 2
// start producer
producer := pc.NewProducer(pAddr, bufSize)
pc.StartProducer(producer)
// generate tasks
for i := 0; i < 10; i++ {
go func(num int) {
pc.EnqueueTask(pAddr, pc.Task{strconv.Itoa(num)})
}(i)
}
// start three consumers
consumer1 := pc.NewConsumer(cAddr1)
pc.StartConsumer(consumer1, pAddr)
consumer2 := pc.NewConsumer(cAddr2)
pc.StartConsumer(consumer2, pAddr)
time.Sleep(30 * time.Second)
pc.Shutdown(pAddr, true)
pc.Shutdown(cAddr1, false)
pc.Shutdown(cAddr2, false)
}

View File

@ -0,0 +1,12 @@
# Producer Consumer
What is the producer-consumer pattern?
> In computing, the producerconsumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer. --- from [wikipedia](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem)
## Implementaion
More information can be found in [producer_consumer package](producer_consumer).
## Example
An example can be found in [producer_consumer.go](producer_consumer.go).

View File

@ -0,0 +1,67 @@
package producer_consumer
import (
"errors"
"log"
"net"
"net/rpc"
"os"
"time"
)
const (
TimeoutEnqueueTask = 5 * time.Second
TimeoutRegister = 5 * time.Second
TimeoutDial = 5 * time.Second
TimeoutShutdown = 5 * time.Second
ProcessDuration = 2 * time.Second
)
var (
ErrorTEnqueueTask = errors.New("Timeout for enqueuing task")
ErrorTRegister = errors.New("Timeout for registering")
ErrorTDial = errors.New("Timeout for dailing")
LogInfo = log.New(os.Stdout, "[Info] ", log.Ltime)
LogError = log.New(os.Stderr, "[Error] ", log.Ltime)
)
func Shutdown(address string, isProducer bool) {
serviceMethod := "Producer.Shutdown"
if !isProducer {
serviceMethod = "Consumer.Shutdown"
}
err := rpcCall(address, serviceMethod, struct{}{}, &struct{}{})
if err != nil {
LogError.Println(err)
}
}
func rpcCall(address string, serviceMethod string, args interface{}, reply *struct{}) error {
conn, err := net.DialTimeout("unix", address, TimeoutDial)
if err != nil {
return err
}
defer conn.Close()
client := rpc.NewClient(conn)
err = client.Call(serviceMethod, args, reply)
if err != nil {
return err
}
return nil
}
func serverAccept(server *rpc.Server, l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
LogError.Println(err)
return
}
go server.ServeConn(conn)
}
}

View File

@ -0,0 +1,55 @@
package producer_consumer
import (
"net"
"net/rpc"
"os"
"time"
)
type Consumer struct {
address string
l net.Listener
}
func NewConsumer(address string) *Consumer {
return &Consumer{address: address}
}
func StartConsumer(c *Consumer, pAddr string) {
server := rpc.NewServer()
server.Register(c)
os.Remove(c.address)
l, err := net.Listen("unix", c.address)
if err != nil {
LogError.Println(err)
}
c.l = l
go serverAccept(server, c.l)
err = rpcCall(pAddr, "Producer.Register", c.address, &struct{}{})
if err != nil {
LogError.Println(err)
c.l.Close()
}
}
func (c *Consumer) DoTask(task Task, _ *struct{}) error {
// takes time to do the task
time.Sleep(ProcessDuration)
LogInfo.Printf("[%s] finish task[%s] successfully!", c.address, task.String())
return nil
}
func (c *Consumer) Shutdown(_ struct{}, _ *struct{}) error {
if err := c.l.Close(); err != nil {
return err
}
LogInfo.Printf("[%s] shutdown successfully!", c.address)
return nil
}

View File

@ -0,0 +1,106 @@
package producer_consumer
import (
"net"
"net/rpc"
"os"
"time"
)
type Producer struct {
address string
tasks chan Task
done chan struct{}
consumers chan string
l net.Listener
}
func NewProducer(address string, capacity int) *Producer {
return &Producer{
address: address,
tasks: make(chan Task, capacity),
consumers: make(chan string, capacity),
done: make(chan struct{}, 0),
}
}
func StartProducer(p *Producer) {
server := rpc.NewServer()
server.Register(p)
os.Remove(p.address)
l, err := net.Listen("unix", p.address)
if err != nil {
LogError.Println(err)
}
p.l = l
go serverAccept(server, p.l)
go p.schedule()
}
func EnqueueTask(pAddr string, task Task) {
err := rpcCall(pAddr, "Producer.Enqueue", task, &struct{}{})
if err != nil {
LogError.Println(err)
}
}
func (p *Producer) schedule() {
for {
select {
case task := <-p.tasks:
// one consumer consumes one job at one time
consumer := <-p.consumers
// if rpcCall fails, this consumer will be regarded as unavailable.
err := rpcCall(consumer, "Consumer.DoTask", task, &struct{}{})
if err != nil {
LogError.Printf("[%s] %s", p.address, err.Error())
} else { // re-register consumer
go func(consumer string) {
select {
case p.consumers <- consumer:
case <-time.After(5 * time.Second):
LogError.Printf("[%s] %s", p.address, ErrorTRegister)
}
}(consumer)
}
case <-p.done:
break
}
}
}
func (p *Producer) Enqueue(task Task, _ *struct{}) error {
select {
case p.tasks <- task:
LogInfo.Printf("[%s] enqueue task[%s] successfully!", p.address, task.String())
case <-time.After(TimeoutEnqueueTask):
return ErrorTEnqueueTask
}
return nil
}
func (p *Producer) Register(address string, _ *struct{}) error {
select {
case p.consumers <- address:
LogInfo.Printf("[%s] register consumer[%s] successfully!", p.address, address)
case <-time.After(TimeoutRegister):
return ErrorTRegister
}
return nil
}
func (p *Producer) Shutdown(_ struct{}, _ *struct{}) error {
p.done <- struct{}{}
if err := p.l.Close(); err != nil {
return err
}
LogInfo.Printf("[%s] shutdown successfully!\n", p.address)
return nil
}

View File

@ -0,0 +1,10 @@
package producer_consumer
// Add whatever you want
type Task struct {
Name string
}
func (t *Task) String() string {
return t.Name
}