package main import ( "errors" "io" "log" "sync" ) // Pool manages a set of resources that can be shared safely by // multiply goroutines. The resource been managed must implement // the io.Closer interface. // declares a struct named Pool that allows // the caller to create as many different pools as needed. Each pool can manage any type // of resource as long as the type implements the io.Closer interface. type Pool struct { // This mutex is used to keep all the operations against a Pool value-safe for multigoroutine access. m sync.Mutex // The second field is named resources and is declared as a channel of interface type io.Closer. // This channel will be created as a buffered channel and will contain the resources being shared. // Because an interface type is being used, the pool can manage any type of resource that // implements the io.Closer interface resources chan io.Closer // The factory field is of a function type. Any function that takes no parameters and // returns an io.Closer and an error interface value can be assigned to this field. The // purpose of this function is to create a new resource when the pool requires one. This // functionality is an implementation detail beyond the scope of the pool package and // needs to be implemented and supplied by the user using this package. factory func() (io.Closer, error) // This field is a flag that indicates the Pool is being shut down or is already shut down. closed bool } // ErrPoolClosed is returned when an Acquire returns on a closed pool. // Creating error interface variables is a common practice in Go. This allows the caller // to identify specific returned error values from any function or method within the package. var ErrPoolClosed = errors.New("Pool has been closed") // New creates a pool that manage resources. A pool requires a function // that can allocate a new resource and the size of the pool. // The function parameter represents a factory function that creates values of the resource being managed by the pool. // The second parameter, size, represents the size of the buffered channel created to hold the resources. func New(fn func() (io.Closer, error), size uint32) (*Pool, error) { // The first parameter, fn, is declared as a function type that accepts no parameters and // returns an io.Closer and an error interface value if size <= 0 { return nil, errors.New("Size value too small") } return &Pool{ factory: fn, resources: make(chan io.Closer, size), }, nil } // Acquire retrieves a resource from the pool. // This method returns a resource from the pool if one is available, or creates a new one for the call. func (p *Pool) Acquire() (io.Closer, error) { select { // Check for free resources // using a select / case statement to check if there’s a resource in the buffered channel. case r, ok := <-p.resources: if !ok { return nil, ErrPoolClosed } log.Println("Aquire: ", "Shared Resource") return r, nil default: log.Println("Aquire: ", "New Resource") return p.factory() } } // Release place a new resource into the pool. // After a resource is acquired and no longer needed, it must be released back into // the pool. This is where the Release method comes in. func (p *Pool) Release(r io.Closer) { // Secure this operation with the Close operation. // The use of the mutex serves two purposes. First, it protects the // read on the closed flag on line 65 from happening at the same time as a write on this // flag in the Close method. Second, we don’t want to attempt to send on a closed channel because this will cause a panic. When the closed // field is false, we know the resources channel has been closed p.m.Lock() defer p.m.Unlock() // If the pool is closed, discard the resource // the Close method on the resource is called directly when the pool is // closed. This is because there’s no way to release the resource back into the pool. At // this point the pool has been both closed and flushed. if p.closed { r.Close() return } select { // Attempt to place the new resource on the queue case p.resources <- r: log.Println("Release: ", "In Queue") default: // If the queue is already at capacity we close the resource log.Println("Release:", "Closing") r.Close() } } // Close will shutdown the pool and close all existing resources. // Once the program is finished with the pool, it should call the Close method. // The method closes and flushes the buffered channel on lines 98 and 101, closing any resources that exist until the channel is // empty. All the code in this method must be executed by only one goroutine at a time. // In fact, when this code is being executed, goroutines must also be // prevented from executing code in the Release method. You’ll understand why this is important soon func (p *Pool) Close() { // Secure this operation with release operation. p.m.Lock() defer p.m.Unlock() // If the pool is already closed, do not do anything. if p.closed { return } // set the pool as closed. p.closed = true // Close the channel before we drain the channel of its // resources close(p.resources) // Close the resources for r := range p.resources { r.Close() } }