awesome-patterns/concurrency/runner/main.go

132 lines
3.5 KiB
Go
Raw Normal View History

2017-12-08 15:04:30 +03:00
package main
2018-01-14 15:53:32 +03:00
// Example provided with help from Gabriel Aszalos.
// Package runner manages the running and lifetime of a process.
2017-12-08 15:04:30 +03:00
import (
"errors"
2018-01-14 15:53:32 +03:00
"log"
2017-12-08 15:04:30 +03:00
"os"
"os/signal"
"time"
)
2018-01-14 15:53:32 +03:00
// Runner runs a set of tasks within a given timeout and can be
// shut down on an operating system interrupt.
// a concurrency pattern for task-oriented programs that
// run unattended on a schedule. Its designed with three possible termination points:
// The program can finish its work within the allotted amount of time and terminate normally.
// The program doesnt finish in time and kills itself.
// An operating system interrupt event is received and the program attempts to
// immediately shut down cleanly.
2017-12-08 15:04:30 +03:00
type Runner struct {
2018-01-14 15:53:32 +03:00
// interrupt channel reports a signal from the
// operating system.
2017-12-08 15:04:30 +03:00
interrupt chan os.Signal
2018-01-14 15:53:32 +03:00
// complete channel reports that processing is done.
complete chan error
// timeout reports that time has run out.
timeout <-chan time.Time
// tasks holds a set of functions that are executed
// synchronously in index order.
tasks []func(int)
2017-12-08 15:04:30 +03:00
}
2018-01-14 15:53:32 +03:00
// ErrTimeout is returned when a value is received on the timeout.
var ErrTimeout = errors.New("received timeout")
2017-12-08 15:04:30 +03:00
2018-01-14 15:53:32 +03:00
// ErrInterrupt is returned when an event from the OS is received.
var ErrInterrupt = errors.New("received interrupt")
// New returns a new ready-to-use Runner.
2017-12-08 15:04:30 +03:00
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
2018-01-14 15:53:32 +03:00
// Add attaches tasks to the Runner. A task is a function that
// takes an int ID.
2017-12-08 15:04:30 +03:00
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
2018-01-14 15:53:32 +03:00
// Start runs all tasks and monitors channel events.
2017-12-08 15:04:30 +03:00
func (r *Runner) Start() error {
2018-01-14 15:53:32 +03:00
// We want to receive all interrupt based signals.
2017-12-08 15:04:30 +03:00
signal.Notify(r.interrupt, os.Interrupt)
2018-01-14 15:53:32 +03:00
// Run the different tasks on a different goroutine.
go func() {
r.complete <- r.run()
}()
select {
// Signaled when processing is done.
case err := <-r.complete:
return err
// Signaled when we run out of time.
case <-r.timeout:
return ErrTimeout
}
}
// run executes each registered task.
func (r *Runner) run() error {
for id, task := range r.tasks {
// Check for an interrupt signal from the OS.
if r.gotInterrupt() {
return ErrInterrupt
}
// Execute the registered task.
task(id)
}
return nil
}
// gotInterrupt verifies if the interrupt signal has been issued.
func (r *Runner) gotInterrupt() bool {
select {
// Signaled when an interrupt event is sent.
case <-r.interrupt:
// Stop receiving any further signals.
signal.Stop(r.interrupt)
return true
// Continue running as normal.
default:
return false
}
}
// timeout is the number of second the program has to finish.
const timeout = 3 * time.Second
// main is the entry point for the program.
func main() {
log.Println("Starting work.")
// Create a new timer value for this run.
r := New(timeout)
// Add the tasks to be run.
r.Add(createTask(), createTask(), createTask())
// Run the tasks and handle the result.
if err := r.Start(); err != nil {
switch err {
case ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
log.Println("Process ended.")
}
// createTask returns an example task that sleeps for the specified
// number of seconds based on the id.
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
2017-12-08 15:04:30 +03:00
}