From f90238c8764052a05119c89909116b8641a87592 Mon Sep 17 00:00:00 2001 From: Jian Han Date: Sun, 14 Jan 2018 22:53:32 +1000 Subject: [PATCH] completed runner pattern --- concurrency/runner/main.go | 107 +++++++++++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 5 deletions(-) diff --git a/concurrency/runner/main.go b/concurrency/runner/main.go index 3295e49..9aba8f1 100644 --- a/concurrency/runner/main.go +++ b/concurrency/runner/main.go @@ -1,22 +1,43 @@ package main +// Example provided with help from Gabriel Aszalos. +// Package runner manages the running and lifetime of a process. import ( "errors" + "log" "os" "os/signal" "time" ) +// Runner runs a set of tasks within a given timeout and can be +// shut down on an operating system interrupt. +// a concurrency pattern for task-oriented programs that +// run unattended on a schedule. It’s designed with three possible termination points: +// The program can finish its work within the allotted amount of time and terminate normally. +// The program doesn’t finish in time and kills itself. +// An operating system interrupt event is received and the program attempts to +// immediately shut down cleanly. type Runner struct { + // interrupt channel reports a signal from the + // operating system. interrupt chan os.Signal - complete chan error - timeout <-chan time.Time - tasks []func(int) + // complete channel reports that processing is done. + complete chan error + // timeout reports that time has run out. + timeout <-chan time.Time + // tasks holds a set of functions that are executed + // synchronously in index order. + tasks []func(int) } -var ErrTimeout = errors.New("Received Timeout") -var ErrInterrupt = errors.New("Received Interrupt") +// ErrTimeout is returned when a value is received on the timeout. +var ErrTimeout = errors.New("received timeout") +// ErrInterrupt is returned when an event from the OS is received. +var ErrInterrupt = errors.New("received interrupt") + +// New returns a new ready-to-use Runner. func New(d time.Duration) *Runner { return &Runner{ interrupt: make(chan os.Signal, 1), @@ -25,10 +46,86 @@ func New(d time.Duration) *Runner { } } +// Add attaches tasks to the Runner. A task is a function that +// takes an int ID. func (r *Runner) Add(tasks ...func(int)) { r.tasks = append(r.tasks, tasks...) } +// Start runs all tasks and monitors channel events. func (r *Runner) Start() error { + // We want to receive all interrupt based signals. signal.Notify(r.interrupt, os.Interrupt) + // Run the different tasks on a different goroutine. + go func() { + r.complete <- r.run() + }() + select { + // Signaled when processing is done. + case err := <-r.complete: + return err + // Signaled when we run out of time. + case <-r.timeout: + return ErrTimeout + } +} + +// run executes each registered task. +func (r *Runner) run() error { + for id, task := range r.tasks { + // Check for an interrupt signal from the OS. + if r.gotInterrupt() { + return ErrInterrupt + } + // Execute the registered task. + task(id) + } + return nil +} + +// gotInterrupt verifies if the interrupt signal has been issued. +func (r *Runner) gotInterrupt() bool { + select { + // Signaled when an interrupt event is sent. + case <-r.interrupt: + // Stop receiving any further signals. + signal.Stop(r.interrupt) + return true + // Continue running as normal. + default: + return false + } +} + +// timeout is the number of second the program has to finish. +const timeout = 3 * time.Second + +// main is the entry point for the program. +func main() { + log.Println("Starting work.") + // Create a new timer value for this run. + r := New(timeout) + // Add the tasks to be run. + r.Add(createTask(), createTask(), createTask()) + // Run the tasks and handle the result. + if err := r.Start(); err != nil { + switch err { + case ErrTimeout: + log.Println("Terminating due to timeout.") + os.Exit(1) + case ErrInterrupt: + log.Println("Terminating due to interrupt.") + os.Exit(2) + } + } + log.Println("Process ended.") +} + +// createTask returns an example task that sleeps for the specified +// number of seconds based on the id. +func createTask() func(int) { + return func(id int) { + log.Printf("Processor - Task #%d.", id) + time.Sleep(time.Duration(id) * time.Second) + } }