2018-01-14 10:37:45 +03:00
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"errors"
|
|
|
|
|
"io"
|
|
|
|
|
"log"
|
2018-01-14 11:33:59 +03:00
|
|
|
|
"math/rand"
|
2018-01-14 10:37:45 +03:00
|
|
|
|
"sync"
|
2018-01-14 11:33:59 +03:00
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
2018-01-14 10:37:45 +03:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Pool manages a set of resources that can be shared safely by
|
|
|
|
|
// multiply goroutines. The resource been managed must implement
|
|
|
|
|
// the io.Closer interface.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// declares a struct named Pool that allows
|
|
|
|
|
// the caller to create as many different pools as needed. Each pool can manage any type
|
|
|
|
|
// of resource as long as the type implements the io.Closer interface.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
type Pool struct {
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// This mutex is used to keep all the operations against a Pool value-safe for multigoroutine access.
|
|
|
|
|
m sync.Mutex
|
|
|
|
|
// The second field is named resources and is declared as a channel of interface type io.Closer.
|
|
|
|
|
// This channel will be created as a buffered channel and will contain the resources being shared.
|
|
|
|
|
// Because an interface type is being used, the pool can manage any type of resource that
|
|
|
|
|
// implements the io.Closer interface
|
2018-01-14 10:37:45 +03:00
|
|
|
|
resources chan io.Closer
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// The factory field is of a function type. Any function that takes no parameters and
|
|
|
|
|
// returns an io.Closer and an error interface value can be assigned to this field. The
|
|
|
|
|
// purpose of this function is to create a new resource when the pool requires one. This
|
|
|
|
|
// functionality is an implementation detail beyond the scope of the pool package and
|
|
|
|
|
// needs to be implemented and supplied by the user using this package.
|
|
|
|
|
factory func() (io.Closer, error)
|
|
|
|
|
// This field is a flag that indicates the Pool is being shut down or is already shut down.
|
|
|
|
|
closed bool
|
2018-01-14 10:37:45 +03:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ErrPoolClosed is returned when an Acquire returns on a closed pool.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// Creating error interface variables is a common practice in Go. This allows the caller
|
|
|
|
|
// to identify specific returned error values from any function or method within the package.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
var ErrPoolClosed = errors.New("Pool has been closed")
|
|
|
|
|
|
|
|
|
|
// New creates a pool that manage resources. A pool requires a function
|
|
|
|
|
// that can allocate a new resource and the size of the pool.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// The function parameter represents a factory function that creates values of the resource being managed by the pool.
|
|
|
|
|
// The second parameter, size, represents the size of the buffered channel created to hold the resources.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
func New(fn func() (io.Closer, error), size uint32) (*Pool, error) {
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// The first parameter, fn, is declared as a function type that accepts no parameters and
|
|
|
|
|
// returns an io.Closer and an error interface value
|
2018-01-14 10:37:45 +03:00
|
|
|
|
if size <= 0 {
|
|
|
|
|
return nil, errors.New("Size value too small")
|
|
|
|
|
}
|
|
|
|
|
return &Pool{
|
|
|
|
|
factory: fn,
|
|
|
|
|
resources: make(chan io.Closer, size),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Acquire retrieves a resource from the pool.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// This method returns a resource from the pool if one is available, or creates a new one for the call.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
func (p *Pool) Acquire() (io.Closer, error) {
|
|
|
|
|
select {
|
|
|
|
|
// Check for free resources
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// using a select / case statement to check if there’s a resource in the buffered channel.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
case r, ok := <-p.resources:
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, ErrPoolClosed
|
|
|
|
|
}
|
|
|
|
|
log.Println("Aquire: ", "Shared Resource")
|
|
|
|
|
return r, nil
|
|
|
|
|
default:
|
|
|
|
|
log.Println("Aquire: ", "New Resource")
|
|
|
|
|
return p.factory()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Release place a new resource into the pool.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// After a resource is acquired and no longer needed, it must be released back into
|
|
|
|
|
// the pool. This is where the Release method comes in.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
func (p *Pool) Release(r io.Closer) {
|
|
|
|
|
// Secure this operation with the Close operation.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// The use of the mutex serves two purposes. First, it protects the
|
|
|
|
|
// read on the closed flag on line 65 from happening at the same time as a write on this
|
|
|
|
|
// flag in the Close method. Second, we don’t want to attempt to send on a closed channel because this will cause a panic. When the closed
|
|
|
|
|
// field is false, we know the resources channel has been closed
|
2018-01-14 10:37:45 +03:00
|
|
|
|
p.m.Lock()
|
|
|
|
|
defer p.m.Unlock()
|
|
|
|
|
// If the pool is closed, discard the resource
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// the Close method on the resource is called directly when the pool is
|
|
|
|
|
// closed. This is because there’s no way to release the resource back into the pool. At
|
|
|
|
|
// this point the pool has been both closed and flushed.
|
2018-01-14 10:37:45 +03:00
|
|
|
|
if p.closed {
|
|
|
|
|
r.Close()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
// Attempt to place the new resource on the queue
|
|
|
|
|
case p.resources <- r:
|
|
|
|
|
log.Println("Release: ", "In Queue")
|
|
|
|
|
default:
|
|
|
|
|
// If the queue is already at capacity we close the resource
|
|
|
|
|
log.Println("Release:", "Closing")
|
|
|
|
|
r.Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close will shutdown the pool and close all existing resources.
|
2018-01-14 11:22:50 +03:00
|
|
|
|
// Once the program is finished with the pool, it should call the Close method.
|
|
|
|
|
// The method closes and flushes the buffered channel on lines 98 and 101, closing any resources that exist until the channel is
|
|
|
|
|
// empty. All the code in this method must be executed by only one goroutine at a time.
|
|
|
|
|
// In fact, when this code is being executed, goroutines must also be
|
|
|
|
|
// prevented from executing code in the Release method. You’ll understand why this is important soon
|
2018-01-14 10:37:45 +03:00
|
|
|
|
func (p *Pool) Close() {
|
|
|
|
|
// Secure this operation with release operation.
|
|
|
|
|
p.m.Lock()
|
|
|
|
|
defer p.m.Unlock()
|
|
|
|
|
// If the pool is already closed, do not do anything.
|
|
|
|
|
if p.closed {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// set the pool as closed.
|
|
|
|
|
p.closed = true
|
|
|
|
|
// Close the channel before we drain the channel of its
|
|
|
|
|
// resources
|
|
|
|
|
close(p.resources)
|
|
|
|
|
// Close the resources
|
|
|
|
|
for r := range p.resources {
|
|
|
|
|
r.Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
2018-01-14 11:33:59 +03:00
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
maxGoroutines = 25
|
|
|
|
|
pooledResources = 2
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// dbConnection simulates a resource to share.
|
|
|
|
|
type dbConnection struct {
|
|
|
|
|
ID int32
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close implements the io.Closer interface so dbConnection
|
|
|
|
|
// can be managed by the pool. Close performs any resource
|
|
|
|
|
// release management.
|
|
|
|
|
func (dbConn *dbConnection) Close() error {
|
|
|
|
|
log.Println("Close: Connection", dbConn.ID)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// idCounter provides support for giving each connection a unique id.
|
|
|
|
|
var idCounter int32
|
|
|
|
|
|
|
|
|
|
// createConnection is a factory method that will be called by
|
|
|
|
|
// the pool when a new connection is needed.
|
|
|
|
|
func createConnection() (io.Closer, error) {
|
|
|
|
|
id := atomic.AddInt32(&idCounter, 1)
|
|
|
|
|
log.Println("Create: New Connection", id)
|
|
|
|
|
return &dbConnection{id}, nil
|
|
|
|
|
}
|
|
|
|
|
func main() {
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
wg.Add(maxGoroutines)
|
|
|
|
|
// Create the pool to manage our connections.
|
|
|
|
|
p, err := New(createConnection, pooledResources)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
}
|
|
|
|
|
// Perform queries using connections from the pool.
|
|
|
|
|
for query := 0; query < maxGoroutines; query++ {
|
|
|
|
|
// Each goroutine needs its own copy of the query
|
|
|
|
|
// value else they will all be sharing the same query
|
|
|
|
|
// variable.
|
|
|
|
|
go func(q int) {
|
|
|
|
|
performQueries(q, p)
|
|
|
|
|
wg.Done()
|
|
|
|
|
}(query)
|
|
|
|
|
}
|
|
|
|
|
// Wait for the goroutines to finish.
|
|
|
|
|
wg.Wait()
|
|
|
|
|
// Close the pool.
|
|
|
|
|
log.Println("Shutdown Program.")
|
|
|
|
|
p.Close()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// performQueries tests the resource pool of connections.
|
|
|
|
|
func performQueries(query int, p *Pool) {
|
|
|
|
|
// Acquire a connection from the pool.
|
|
|
|
|
conn, err := p.Acquire()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Println(err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Release the connection back to the pool.
|
|
|
|
|
defer p.Release(conn)
|
|
|
|
|
// Wait to simulate a query response.
|
|
|
|
|
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
|
|
|
|
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
|
|
|
|
|
}
|