mirror of
https://github.com/crazybber/awesome-patterns.git
synced 2024-12-01 17:36:09 +03:00
292 lines
9.0 KiB
Go
292 lines
9.0 KiB
Go
|
// Copyright 2013 Ardan Studios. All rights reserved.
|
||
|
// Use of workPool source code is governed by a BSD-style
|
||
|
// license that can be found in the LICENSE file.
|
||
|
|
||
|
/*
|
||
|
Package workpool implements a pool of go routines that are dedicated to processing work that is posted into the pool.
|
||
|
Read the following blog post for more information:blogspot
|
||
|
http://www.goinggo.net/2013/05/thread-pooling-in-go-programming.html
|
||
|
New Parameters
|
||
|
The following is a list of parameters for creating a TraceLog:
|
||
|
numberOfRoutines: Sets the number of worker routines that are allowed to process work concurrently
|
||
|
queueCapacity: Sets the maximum number of pending work objects that can be in queue
|
||
|
WorkPool Management
|
||
|
Go routines are used to manage and process all the work. A single Queue routine provides the safe queuing of work.
|
||
|
The Queue routine keeps track of the amount of work in the queue and reports an error if the queue is full.
|
||
|
The concurrencyLevel parameter defines the number of work routines to create. These work routines will process work
|
||
|
subbmitted to the queue. The work routines keep track of the number of active work routines for reporting.
|
||
|
The PostWork method is used to post work into the ThreadPool. This call will block until the Queue routine reports back
|
||
|
success or failure that the work is in queue.
|
||
|
Example Use Of ThreadPool
|
||
|
The following shows a simple test application
|
||
|
package main
|
||
|
import (
|
||
|
"github.com/goinggo/workpool"
|
||
|
"bufio"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"runtime"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
)
|
||
|
type MyWork struct {
|
||
|
Name string "The Name of a person"
|
||
|
BirthYear int "The Yea the person was born"
|
||
|
WP *workpool.WorkPool
|
||
|
}
|
||
|
func (workPool *MyWork) DoWork(workRoutine int) {
|
||
|
fmt.Printf("%s : %d\n", workPool.Name, workPool.BirthYear)
|
||
|
fmt.Printf("*******> WR: %d QW: %d AR: %d\n", workRoutine, workPool.WP.QueuedWork(), workPool.WP.ActiveRoutines())
|
||
|
time.Sleep(100 * time.Millisecond)
|
||
|
//panic("test")
|
||
|
}
|
||
|
func main() {
|
||
|
runtime.GOMAXPROCS(runtime.NumCPU())
|
||
|
workPool := workpool.New(runtime.NumCPU() * 3, 10)
|
||
|
shutdown := false // Just for testing, I Know
|
||
|
go func() {
|
||
|
for i := 0; i < 1000; i++ {
|
||
|
work := &MyWork{
|
||
|
Name: "A" + strconv.Itoa(i),
|
||
|
BirthYear: i,
|
||
|
WP: workPool,
|
||
|
}
|
||
|
err := workPool.PostWork("name_routine", work)
|
||
|
if err != nil {
|
||
|
fmt.Printf("ERROR: %s\n", err)
|
||
|
time.Sleep(100 * time.Millisecond)
|
||
|
}
|
||
|
if shutdown == true {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
fmt.Println("Hit any key to exit")
|
||
|
reader := bufio.NewReader(os.Stdin)
|
||
|
reader.ReadString('\n')
|
||
|
shutdown = true
|
||
|
fmt.Println("Shutting Down\n")
|
||
|
workPool.Shutdown("name_routine")
|
||
|
}
|
||
|
Example Output
|
||
|
The following shows some sample output
|
||
|
A336 : 336
|
||
|
******> QW: 5 AR: 8
|
||
|
A337 : 337
|
||
|
*******> QW: 4 AR: 8
|
||
|
ERROR: Thread Pool At Capacity
|
||
|
A338 : 338
|
||
|
*******> QW: 3 AR: 8
|
||
|
A339 : 339
|
||
|
*******> QW: 2 AR: 8
|
||
|
CHANGE FOR ARTICLE
|
||
|
*/
|
||
|
package main
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"runtime"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrCapacity = errors.New("Thread Pool At Capacity")
|
||
|
)
|
||
|
|
||
|
type (
|
||
|
// poolWork is passed into the queue for work to be performed.
|
||
|
poolWork struct {
|
||
|
work PoolWorker // The Work to be performed.
|
||
|
resultChannel chan error // Used to inform the queue operaion is complete.
|
||
|
}
|
||
|
|
||
|
// WorkPool implements a work pool with the specified concurrency level and queue capacity.
|
||
|
WorkPool struct {
|
||
|
shutdownQueueChannel chan string // Channel used to shut down the queue routine.
|
||
|
shutdownWorkChannel chan struct{} // Channel used to shut down the work routines.
|
||
|
shutdownWaitGroup sync.WaitGroup // The WaitGroup for shutting down existing routines.
|
||
|
queueChannel chan poolWork // Channel used to sync access to the queue.
|
||
|
workChannel chan PoolWorker // Channel used to process work.
|
||
|
queuedWork int32 // The number of work items queued.
|
||
|
activeRoutines int32 // The number of routines active.
|
||
|
queueCapacity int32 // The max number of items we can store in the queue.
|
||
|
}
|
||
|
)
|
||
|
|
||
|
// PoolWorker must be implemented by the object we will perform work on, now.
|
||
|
type PoolWorker interface {
|
||
|
DoWork(workRoutine int)
|
||
|
}
|
||
|
|
||
|
// init is called when the system is inited.
|
||
|
func init() {
|
||
|
log.SetPrefix("TRACE: ")
|
||
|
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
|
||
|
}
|
||
|
|
||
|
// New creates a new WorkPool.
|
||
|
func New(numberOfRoutines int, queueCapacity int32) *WorkPool {
|
||
|
workPool := WorkPool{
|
||
|
shutdownQueueChannel: make(chan string),
|
||
|
shutdownWorkChannel: make(chan struct{}),
|
||
|
queueChannel: make(chan poolWork),
|
||
|
workChannel: make(chan PoolWorker, queueCapacity),
|
||
|
queuedWork: 0,
|
||
|
activeRoutines: 0,
|
||
|
queueCapacity: queueCapacity,
|
||
|
}
|
||
|
|
||
|
// Add the total number of routines to the wait group
|
||
|
workPool.shutdownWaitGroup.Add(numberOfRoutines)
|
||
|
|
||
|
// Launch the work routines to process work
|
||
|
for workRoutine := 0; workRoutine < numberOfRoutines; workRoutine++ {
|
||
|
go workPool.workRoutine(workRoutine)
|
||
|
}
|
||
|
|
||
|
// Start the queue routine to capture and provide work
|
||
|
go workPool.queueRoutine()
|
||
|
|
||
|
return &workPool
|
||
|
}
|
||
|
|
||
|
// Shutdown will release resources and shutdown all processing.
|
||
|
func (workPool *WorkPool) Shutdown(goRoutine string) (err error) {
|
||
|
defer catchPanic(&err, goRoutine, "Shutdown")
|
||
|
|
||
|
writeStdout(goRoutine, "Shutdown", "Started")
|
||
|
writeStdout(goRoutine, "Shutdown", "Queue Routine")
|
||
|
|
||
|
workPool.shutdownQueueChannel <- "Down"
|
||
|
<-workPool.shutdownQueueChannel
|
||
|
|
||
|
close(workPool.queueChannel)
|
||
|
close(workPool.shutdownQueueChannel)
|
||
|
|
||
|
writeStdout(goRoutine, "Shutdown", "Shutting Down Work Routines")
|
||
|
|
||
|
// Close the channel to shut things down.
|
||
|
close(workPool.shutdownWorkChannel)
|
||
|
workPool.shutdownWaitGroup.Wait()
|
||
|
|
||
|
close(workPool.workChannel)
|
||
|
|
||
|
writeStdout(goRoutine, "Shutdown", "Completed")
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// PostWork will post work into the WorkPool. This call will block until the Queue routine reports back
|
||
|
// success or failure that the work is in queue.
|
||
|
func (workPool *WorkPool) PostWork(goRoutine string, work PoolWorker) (err error) {
|
||
|
defer catchPanic(&err, goRoutine, "PostWork")
|
||
|
|
||
|
poolWork := poolWork{work, make(chan error)}
|
||
|
|
||
|
defer close(poolWork.resultChannel)
|
||
|
|
||
|
workPool.queueChannel <- poolWork
|
||
|
err = <-poolWork.resultChannel
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// QueuedWork will return the number of work items in queue.
|
||
|
func (workPool *WorkPool) QueuedWork() int32 {
|
||
|
return atomic.AddInt32(&workPool.queuedWork, 0)
|
||
|
}
|
||
|
|
||
|
// ActiveRoutines will return the number of routines performing work.
|
||
|
func (workPool *WorkPool) ActiveRoutines() int32 {
|
||
|
return atomic.AddInt32(&workPool.activeRoutines, 0)
|
||
|
}
|
||
|
|
||
|
// CatchPanic is used to catch any Panic and log exceptions to Stdout. It will also write the stack trace.
|
||
|
func catchPanic(err *error, goRoutine string, functionName string) {
|
||
|
if r := recover(); r != nil {
|
||
|
// Capture the stack trace
|
||
|
buf := make([]byte, 10000)
|
||
|
runtime.Stack(buf, false)
|
||
|
|
||
|
writeStdoutf(goRoutine, functionName, "PANIC Defered [%v] : Stack Trace : %v", r, string(buf))
|
||
|
|
||
|
if err != nil {
|
||
|
*err = fmt.Errorf("%v", r)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// writeStdout is used to write a system message directly to stdout.
|
||
|
func writeStdout(goRoutine string, functionName string, message string) {
|
||
|
log.Printf("%s : %s : %s\n", goRoutine, functionName, message)
|
||
|
}
|
||
|
|
||
|
// writeStdoutf is used to write a formatted system message directly stdout.
|
||
|
func writeStdoutf(goRoutine string, functionName string, format string, a ...interface{}) {
|
||
|
writeStdout(goRoutine, functionName, fmt.Sprintf(format, a...))
|
||
|
}
|
||
|
|
||
|
// workRoutine performs the work required by the work pool
|
||
|
func (workPool *WorkPool) workRoutine(workRoutine int) {
|
||
|
for {
|
||
|
select {
|
||
|
// Shutdown the WorkRoutine.
|
||
|
case <-workPool.shutdownWorkChannel:
|
||
|
writeStdout(fmt.Sprintf("WorkRoutine %d", workRoutine), "workRoutine", "Going Down")
|
||
|
workPool.shutdownWaitGroup.Done()
|
||
|
return
|
||
|
|
||
|
// There is work in the queue.
|
||
|
case poolWorker := <-workPool.workChannel:
|
||
|
workPool.safelyDoWork(workRoutine, poolWorker)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// safelyDoWork executes the user DoWork method.
|
||
|
func (workPool *WorkPool) safelyDoWork(workRoutine int, poolWorker PoolWorker) {
|
||
|
defer catchPanic(nil, "WorkRoutine", "SafelyDoWork")
|
||
|
defer atomic.AddInt32(&workPool.activeRoutines, -1)
|
||
|
|
||
|
// Update the counts
|
||
|
atomic.AddInt32(&workPool.queuedWork, -1)
|
||
|
atomic.AddInt32(&workPool.activeRoutines, 1)
|
||
|
|
||
|
// Perform the work
|
||
|
poolWorker.DoWork(workRoutine)
|
||
|
}
|
||
|
|
||
|
// queueRoutine captures and provides work.
|
||
|
func (workPool *WorkPool) queueRoutine() {
|
||
|
for {
|
||
|
select {
|
||
|
// Shutdown the QueueRoutine.
|
||
|
case <-workPool.shutdownQueueChannel:
|
||
|
writeStdout("Queue", "queueRoutine", "Going Down")
|
||
|
workPool.shutdownQueueChannel <- "Down"
|
||
|
return
|
||
|
|
||
|
// Post work to be processed.
|
||
|
case queueItem := <-workPool.queueChannel:
|
||
|
// If the queue is at capacity don't add it.
|
||
|
if atomic.AddInt32(&workPool.queuedWork, 0) == workPool.queueCapacity {
|
||
|
queueItem.resultChannel <- ErrCapacity
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// Increment the queued work count.
|
||
|
atomic.AddInt32(&workPool.queuedWork, 1)
|
||
|
|
||
|
// Queue the work for the WorkRoutine to process.
|
||
|
workPool.workChannel <- queueItem.work
|
||
|
|
||
|
// Tell the caller the work is queued.
|
||
|
queueItem.resultChannel <- nil
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|