awesome-patterns/concurrency/runner/main.go
2018-01-14 22:53:32 +10:00

132 lines
3.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
// Example provided with help from Gabriel Aszalos.
// Package runner manages the running and lifetime of a process.
import (
"errors"
"log"
"os"
"os/signal"
"time"
)
// Runner runs a set of tasks within a given timeout and can be
// shut down on an operating system interrupt.
// a concurrency pattern for task-oriented programs that
// run unattended on a schedule. Its designed with three possible termination points:
// The program can finish its work within the allotted amount of time and terminate normally.
// The program doesnt finish in time and kills itself.
// An operating system interrupt event is received and the program attempts to
// immediately shut down cleanly.
type Runner struct {
// interrupt channel reports a signal from the
// operating system.
interrupt chan os.Signal
// complete channel reports that processing is done.
complete chan error
// timeout reports that time has run out.
timeout <-chan time.Time
// tasks holds a set of functions that are executed
// synchronously in index order.
tasks []func(int)
}
// ErrTimeout is returned when a value is received on the timeout.
var ErrTimeout = errors.New("received timeout")
// ErrInterrupt is returned when an event from the OS is received.
var ErrInterrupt = errors.New("received interrupt")
// New returns a new ready-to-use Runner.
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
// Add attaches tasks to the Runner. A task is a function that
// takes an int ID.
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
// Start runs all tasks and monitors channel events.
func (r *Runner) Start() error {
// We want to receive all interrupt based signals.
signal.Notify(r.interrupt, os.Interrupt)
// Run the different tasks on a different goroutine.
go func() {
r.complete <- r.run()
}()
select {
// Signaled when processing is done.
case err := <-r.complete:
return err
// Signaled when we run out of time.
case <-r.timeout:
return ErrTimeout
}
}
// run executes each registered task.
func (r *Runner) run() error {
for id, task := range r.tasks {
// Check for an interrupt signal from the OS.
if r.gotInterrupt() {
return ErrInterrupt
}
// Execute the registered task.
task(id)
}
return nil
}
// gotInterrupt verifies if the interrupt signal has been issued.
func (r *Runner) gotInterrupt() bool {
select {
// Signaled when an interrupt event is sent.
case <-r.interrupt:
// Stop receiving any further signals.
signal.Stop(r.interrupt)
return true
// Continue running as normal.
default:
return false
}
}
// timeout is the number of second the program has to finish.
const timeout = 3 * time.Second
// main is the entry point for the program.
func main() {
log.Println("Starting work.")
// Create a new timer value for this run.
r := New(timeout)
// Add the tasks to be run.
r.Add(createTask(), createTask(), createTask())
// Run the tasks and handle the result.
if err := r.Start(); err != nil {
switch err {
case ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
// createTask returns an example task that sleeps for the specified
// number of seconds based on the id.
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}