mirror of
https://github.com/crazybber/awesome-patterns.git
synced 2024-11-25 06:16:03 +03:00
198 lines
6.9 KiB
Go
198 lines
6.9 KiB
Go
package main
|
||
|
||
import (
|
||
"errors"
|
||
"io"
|
||
"log"
|
||
"math/rand"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// Pool manages a set of resources that can be shared safely by
|
||
// multiply goroutines. The resource been managed must implement
|
||
// the io.Closer interface.
|
||
// declares a struct named Pool that allows
|
||
// the caller to create as many different pools as needed. Each pool can manage any type
|
||
// of resource as long as the type implements the io.Closer interface.
|
||
type Pool struct {
|
||
// This mutex is used to keep all the operations against a Pool value-safe for multigoroutine access.
|
||
m sync.Mutex
|
||
// The second field is named resources and is declared as a channel of interface type io.Closer.
|
||
// This channel will be created as a buffered channel and will contain the resources being shared.
|
||
// Because an interface type is being used, the pool can manage any type of resource that
|
||
// implements the io.Closer interface
|
||
resources chan io.Closer
|
||
// The factory field is of a function type. Any function that takes no parameters and
|
||
// returns an io.Closer and an error interface value can be assigned to this field. The
|
||
// purpose of this function is to create a new resource when the pool requires one. This
|
||
// functionality is an implementation detail beyond the scope of the pool package and
|
||
// needs to be implemented and supplied by the user using this package.
|
||
factory func() (io.Closer, error)
|
||
// This field is a flag that indicates the Pool is being shut down or is already shut down.
|
||
closed bool
|
||
}
|
||
|
||
// ErrPoolClosed is returned when an Acquire returns on a closed pool.
|
||
// Creating error interface variables is a common practice in Go. This allows the caller
|
||
// to identify specific returned error values from any function or method within the package.
|
||
var ErrPoolClosed = errors.New("Pool has been closed")
|
||
|
||
// New creates a pool that manage resources. A pool requires a function
|
||
// that can allocate a new resource and the size of the pool.
|
||
// The function parameter represents a factory function that creates values of the resource being managed by the pool.
|
||
// The second parameter, size, represents the size of the buffered channel created to hold the resources.
|
||
func New(fn func() (io.Closer, error), size uint32) (*Pool, error) {
|
||
// The first parameter, fn, is declared as a function type that accepts no parameters and
|
||
// returns an io.Closer and an error interface value
|
||
if size <= 0 {
|
||
return nil, errors.New("Size value too small")
|
||
}
|
||
return &Pool{
|
||
factory: fn,
|
||
resources: make(chan io.Closer, size),
|
||
}, nil
|
||
}
|
||
|
||
// Acquire retrieves a resource from the pool.
|
||
// This method returns a resource from the pool if one is available, or creates a new one for the call.
|
||
func (p *Pool) Acquire() (io.Closer, error) {
|
||
select {
|
||
// Check for free resources
|
||
// using a select / case statement to check if there’s a resource in the buffered channel.
|
||
case r, ok := <-p.resources:
|
||
if !ok {
|
||
return nil, ErrPoolClosed
|
||
}
|
||
log.Println("Aquire: ", "Shared Resource")
|
||
return r, nil
|
||
default:
|
||
log.Println("Aquire: ", "New Resource")
|
||
return p.factory()
|
||
}
|
||
}
|
||
|
||
// Release place a new resource into the pool.
|
||
// After a resource is acquired and no longer needed, it must be released back into
|
||
// the pool. This is where the Release method comes in.
|
||
func (p *Pool) Release(r io.Closer) {
|
||
// Secure this operation with the Close operation.
|
||
// The use of the mutex serves two purposes. First, it protects the
|
||
// read on the closed flag on line 65 from happening at the same time as a write on this
|
||
// flag in the Close method. Second, we don’t want to attempt to send on a closed channel because this will cause a panic. When the closed
|
||
// field is false, we know the resources channel has been closed
|
||
p.m.Lock()
|
||
defer p.m.Unlock()
|
||
// If the pool is closed, discard the resource
|
||
// the Close method on the resource is called directly when the pool is
|
||
// closed. This is because there’s no way to release the resource back into the pool. At
|
||
// this point the pool has been both closed and flushed.
|
||
if p.closed {
|
||
r.Close()
|
||
return
|
||
}
|
||
select {
|
||
// Attempt to place the new resource on the queue
|
||
case p.resources <- r:
|
||
log.Println("Release: ", "In Queue")
|
||
default:
|
||
// If the queue is already at capacity we close the resource
|
||
log.Println("Release:", "Closing")
|
||
r.Close()
|
||
}
|
||
}
|
||
|
||
// Close will shutdown the pool and close all existing resources.
|
||
// Once the program is finished with the pool, it should call the Close method.
|
||
// The method closes and flushes the buffered channel on lines 98 and 101, closing any resources that exist until the channel is
|
||
// empty. All the code in this method must be executed by only one goroutine at a time.
|
||
// In fact, when this code is being executed, goroutines must also be
|
||
// prevented from executing code in the Release method. You’ll understand why this is important soon
|
||
func (p *Pool) Close() {
|
||
// Secure this operation with release operation.
|
||
p.m.Lock()
|
||
defer p.m.Unlock()
|
||
// If the pool is already closed, do not do anything.
|
||
if p.closed {
|
||
return
|
||
}
|
||
// set the pool as closed.
|
||
p.closed = true
|
||
// Close the channel before we drain the channel of its
|
||
// resources
|
||
close(p.resources)
|
||
// Close the resources
|
||
for r := range p.resources {
|
||
r.Close()
|
||
}
|
||
}
|
||
|
||
const (
|
||
maxGoroutines = 25
|
||
pooledResources = 2
|
||
)
|
||
|
||
// dbConnection simulates a resource to share.
|
||
type dbConnection struct {
|
||
ID int32
|
||
}
|
||
|
||
// Close implements the io.Closer interface so dbConnection
|
||
// can be managed by the pool. Close performs any resource
|
||
// release management.
|
||
func (dbConn *dbConnection) Close() error {
|
||
log.Println("Close: Connection", dbConn.ID)
|
||
return nil
|
||
}
|
||
|
||
// idCounter provides support for giving each connection a unique id.
|
||
var idCounter int32
|
||
|
||
// createConnection is a factory method that will be called by
|
||
// the pool when a new connection is needed.
|
||
func createConnection() (io.Closer, error) {
|
||
id := atomic.AddInt32(&idCounter, 1)
|
||
log.Println("Create: New Connection", id)
|
||
return &dbConnection{id}, nil
|
||
}
|
||
func main() {
|
||
var wg sync.WaitGroup
|
||
wg.Add(maxGoroutines)
|
||
// Create the pool to manage our connections.
|
||
p, err := New(createConnection, pooledResources)
|
||
if err != nil {
|
||
log.Println(err)
|
||
}
|
||
// Perform queries using connections from the pool.
|
||
for query := 0; query < maxGoroutines; query++ {
|
||
// Each goroutine needs its own copy of the query
|
||
// value else they will all be sharing the same query
|
||
// variable.
|
||
go func(q int) {
|
||
performQueries(q, p)
|
||
wg.Done()
|
||
}(query)
|
||
}
|
||
// Wait for the goroutines to finish.
|
||
wg.Wait()
|
||
// Close the pool.
|
||
log.Println("Shutdown Program.")
|
||
p.Close()
|
||
}
|
||
|
||
// performQueries tests the resource pool of connections.
|
||
func performQueries(query int, p *Pool) {
|
||
// Acquire a connection from the pool.
|
||
conn, err := p.Acquire()
|
||
if err != nil {
|
||
log.Println(err)
|
||
return
|
||
}
|
||
// Release the connection back to the pool.
|
||
defer p.Release(conn)
|
||
// Wait to simulate a query response.
|
||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
||
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
|
||
}
|