awesome-patterns/concurrency/pool/main.go
2018-01-14 18:33:59 +10:00

198 lines
6.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"errors"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Pool manages a set of resources that can be shared safely by
// multiply goroutines. The resource been managed must implement
// the io.Closer interface.
// declares a struct named Pool that allows
// the caller to create as many different pools as needed. Each pool can manage any type
// of resource as long as the type implements the io.Closer interface.
type Pool struct {
// This mutex is used to keep all the operations against a Pool value-safe for multigoroutine access.
m sync.Mutex
// The second field is named resources and is declared as a channel of interface type io.Closer.
// This channel will be created as a buffered channel and will contain the resources being shared.
// Because an interface type is being used, the pool can manage any type of resource that
// implements the io.Closer interface
resources chan io.Closer
// The factory field is of a function type. Any function that takes no parameters and
// returns an io.Closer and an error interface value can be assigned to this field. The
// purpose of this function is to create a new resource when the pool requires one. This
// functionality is an implementation detail beyond the scope of the pool package and
// needs to be implemented and supplied by the user using this package.
factory func() (io.Closer, error)
// This field is a flag that indicates the Pool is being shut down or is already shut down.
closed bool
}
// ErrPoolClosed is returned when an Acquire returns on a closed pool.
// Creating error interface variables is a common practice in Go. This allows the caller
// to identify specific returned error values from any function or method within the package.
var ErrPoolClosed = errors.New("Pool has been closed")
// New creates a pool that manage resources. A pool requires a function
// that can allocate a new resource and the size of the pool.
// The function parameter represents a factory function that creates values of the resource being managed by the pool.
// The second parameter, size, represents the size of the buffered channel created to hold the resources.
func New(fn func() (io.Closer, error), size uint32) (*Pool, error) {
// The first parameter, fn, is declared as a function type that accepts no parameters and
// returns an io.Closer and an error interface value
if size <= 0 {
return nil, errors.New("Size value too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// Acquire retrieves a resource from the pool.
// This method returns a resource from the pool if one is available, or creates a new one for the call.
func (p *Pool) Acquire() (io.Closer, error) {
select {
// Check for free resources
// using a select / case statement to check if theres a resource in the buffered channel.
case r, ok := <-p.resources:
if !ok {
return nil, ErrPoolClosed
}
log.Println("Aquire: ", "Shared Resource")
return r, nil
default:
log.Println("Aquire: ", "New Resource")
return p.factory()
}
}
// Release place a new resource into the pool.
// After a resource is acquired and no longer needed, it must be released back into
// the pool. This is where the Release method comes in.
func (p *Pool) Release(r io.Closer) {
// Secure this operation with the Close operation.
// The use of the mutex serves two purposes. First, it protects the
// read on the closed flag on line 65 from happening at the same time as a write on this
// flag in the Close method. Second, we dont want to attempt to send on a closed channel because this will cause a panic. When the closed
// field is false, we know the resources channel has been closed
p.m.Lock()
defer p.m.Unlock()
// If the pool is closed, discard the resource
// the Close method on the resource is called directly when the pool is
// closed. This is because theres no way to release the resource back into the pool. At
// this point the pool has been both closed and flushed.
if p.closed {
r.Close()
return
}
select {
// Attempt to place the new resource on the queue
case p.resources <- r:
log.Println("Release: ", "In Queue")
default:
// If the queue is already at capacity we close the resource
log.Println("Release:", "Closing")
r.Close()
}
}
// Close will shutdown the pool and close all existing resources.
// Once the program is finished with the pool, it should call the Close method.
// The method closes and flushes the buffered channel on lines 98 and 101, closing any resources that exist until the channel is
// empty. All the code in this method must be executed by only one goroutine at a time.
// In fact, when this code is being executed, goroutines must also be
// prevented from executing code in the Release method. Youll understand why this is important soon
func (p *Pool) Close() {
// Secure this operation with release operation.
p.m.Lock()
defer p.m.Unlock()
// If the pool is already closed, do not do anything.
if p.closed {
return
}
// set the pool as closed.
p.closed = true
// Close the channel before we drain the channel of its
// resources
close(p.resources)
// Close the resources
for r := range p.resources {
r.Close()
}
}
const (
maxGoroutines = 25
pooledResources = 2
)
// dbConnection simulates a resource to share.
type dbConnection struct {
ID int32
}
// Close implements the io.Closer interface so dbConnection
// can be managed by the pool. Close performs any resource
// release management.
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
// idCounter provides support for giving each connection a unique id.
var idCounter int32
// createConnection is a factory method that will be called by
// the pool when a new connection is needed.
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// Create the pool to manage our connections.
p, err := New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
// Perform queries using connections from the pool.
for query := 0; query < maxGoroutines; query++ {
// Each goroutine needs its own copy of the query
// value else they will all be sharing the same query
// variable.
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
// Wait for the goroutines to finish.
wg.Wait()
// Close the pool.
log.Println("Shutdown Program.")
p.Close()
}
// performQueries tests the resource pool of connections.
func performQueries(query int, p *Pool) {
// Acquire a connection from the pool.
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// Release the connection back to the pool.
defer p.Release(conn)
// Wait to simulate a query response.
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}