awesome-patterns/workerpool/pool/main.go

198 lines
6.9 KiB
Go
Raw Permalink Normal View History

2018-01-14 10:37:45 +03:00
package main
import (
"errors"
"io"
"log"
2018-01-14 11:33:59 +03:00
"math/rand"
2018-01-14 10:37:45 +03:00
"sync"
2018-01-14 11:33:59 +03:00
"sync/atomic"
"time"
2018-01-14 10:37:45 +03:00
)
// Pool manages a set of resources that can be shared safely by
// multiply goroutines. The resource been managed must implement
// the io.Closer interface.
2018-01-14 11:22:50 +03:00
// declares a struct named Pool that allows
// the caller to create as many different pools as needed. Each pool can manage any type
// of resource as long as the type implements the io.Closer interface.
2018-01-14 10:37:45 +03:00
type Pool struct {
2018-01-14 11:22:50 +03:00
// This mutex is used to keep all the operations against a Pool value-safe for multigoroutine access.
m sync.Mutex
// The second field is named resources and is declared as a channel of interface type io.Closer.
// This channel will be created as a buffered channel and will contain the resources being shared.
// Because an interface type is being used, the pool can manage any type of resource that
// implements the io.Closer interface
2018-01-14 10:37:45 +03:00
resources chan io.Closer
2018-01-14 11:22:50 +03:00
// The factory field is of a function type. Any function that takes no parameters and
// returns an io.Closer and an error interface value can be assigned to this field. The
// purpose of this function is to create a new resource when the pool requires one. This
// functionality is an implementation detail beyond the scope of the pool package and
// needs to be implemented and supplied by the user using this package.
factory func() (io.Closer, error)
// This field is a flag that indicates the Pool is being shut down or is already shut down.
closed bool
2018-01-14 10:37:45 +03:00
}
// ErrPoolClosed is returned when an Acquire returns on a closed pool.
2018-01-14 11:22:50 +03:00
// Creating error interface variables is a common practice in Go. This allows the caller
// to identify specific returned error values from any function or method within the package.
2018-01-14 10:37:45 +03:00
var ErrPoolClosed = errors.New("Pool has been closed")
// New creates a pool that manage resources. A pool requires a function
// that can allocate a new resource and the size of the pool.
2018-01-14 11:22:50 +03:00
// The function parameter represents a factory function that creates values of the resource being managed by the pool.
// The second parameter, size, represents the size of the buffered channel created to hold the resources.
2018-01-14 10:37:45 +03:00
func New(fn func() (io.Closer, error), size uint32) (*Pool, error) {
2018-01-14 11:22:50 +03:00
// The first parameter, fn, is declared as a function type that accepts no parameters and
// returns an io.Closer and an error interface value
2018-01-14 10:37:45 +03:00
if size <= 0 {
return nil, errors.New("Size value too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// Acquire retrieves a resource from the pool.
2018-01-14 11:22:50 +03:00
// This method returns a resource from the pool if one is available, or creates a new one for the call.
2018-01-14 10:37:45 +03:00
func (p *Pool) Acquire() (io.Closer, error) {
select {
// Check for free resources
2018-01-14 11:22:50 +03:00
// using a select / case statement to check if theres a resource in the buffered channel.
2018-01-14 10:37:45 +03:00
case r, ok := <-p.resources:
if !ok {
return nil, ErrPoolClosed
}
log.Println("Aquire: ", "Shared Resource")
return r, nil
default:
log.Println("Aquire: ", "New Resource")
return p.factory()
}
}
// Release place a new resource into the pool.
2018-01-14 11:22:50 +03:00
// After a resource is acquired and no longer needed, it must be released back into
// the pool. This is where the Release method comes in.
2018-01-14 10:37:45 +03:00
func (p *Pool) Release(r io.Closer) {
// Secure this operation with the Close operation.
2018-01-14 11:22:50 +03:00
// The use of the mutex serves two purposes. First, it protects the
// read on the closed flag on line 65 from happening at the same time as a write on this
// flag in the Close method. Second, we dont want to attempt to send on a closed channel because this will cause a panic. When the closed
// field is false, we know the resources channel has been closed
2018-01-14 10:37:45 +03:00
p.m.Lock()
defer p.m.Unlock()
// If the pool is closed, discard the resource
2018-01-14 11:22:50 +03:00
// the Close method on the resource is called directly when the pool is
// closed. This is because theres no way to release the resource back into the pool. At
// this point the pool has been both closed and flushed.
2018-01-14 10:37:45 +03:00
if p.closed {
r.Close()
return
}
select {
// Attempt to place the new resource on the queue
case p.resources <- r:
log.Println("Release: ", "In Queue")
default:
// If the queue is already at capacity we close the resource
log.Println("Release:", "Closing")
r.Close()
}
}
// Close will shutdown the pool and close all existing resources.
2018-01-14 11:22:50 +03:00
// Once the program is finished with the pool, it should call the Close method.
// The method closes and flushes the buffered channel on lines 98 and 101, closing any resources that exist until the channel is
// empty. All the code in this method must be executed by only one goroutine at a time.
// In fact, when this code is being executed, goroutines must also be
// prevented from executing code in the Release method. Youll understand why this is important soon
2018-01-14 10:37:45 +03:00
func (p *Pool) Close() {
// Secure this operation with release operation.
p.m.Lock()
defer p.m.Unlock()
// If the pool is already closed, do not do anything.
if p.closed {
return
}
// set the pool as closed.
p.closed = true
// Close the channel before we drain the channel of its
// resources
close(p.resources)
// Close the resources
for r := range p.resources {
r.Close()
}
}
2018-01-14 11:33:59 +03:00
const (
maxGoroutines = 25
pooledResources = 2
)
// dbConnection simulates a resource to share.
type dbConnection struct {
ID int32
}
// Close implements the io.Closer interface so dbConnection
// can be managed by the pool. Close performs any resource
// release management.
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
// idCounter provides support for giving each connection a unique id.
var idCounter int32
// createConnection is a factory method that will be called by
// the pool when a new connection is needed.
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// Create the pool to manage our connections.
p, err := New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
// Perform queries using connections from the pool.
for query := 0; query < maxGoroutines; query++ {
// Each goroutine needs its own copy of the query
// value else they will all be sharing the same query
// variable.
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
// Wait for the goroutines to finish.
wg.Wait()
// Close the pool.
log.Println("Shutdown Program.")
p.Close()
}
// performQueries tests the resource pool of connections.
func performQueries(query int, p *Pool) {
// Acquire a connection from the pool.
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// Release the connection back to the pool.
defer p.Release(conn)
// Wait to simulate a query response.
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}