changes for JobManager internal logic, partial tests

This commit is contained in:
Pavel 2019-12-17 10:39:48 +03:00
parent c191d2c8e0
commit ee1de01b31
4 changed files with 474 additions and 112 deletions

View File

@ -21,6 +21,7 @@ type Engine struct {
httpClient *http.Client httpClient *http.Client
Logger *logging.Logger Logger *logging.Logger
csrf *CSRF csrf *CSRF
jobManager *JobManager
Sessions sessions.Store Sessions sessions.Store
Config ConfigInterface Config ConfigInterface
LogFormatter logging.Formatter LogFormatter logging.Formatter
@ -134,6 +135,15 @@ func (e *Engine) Router() *gin.Engine {
return e.ginEngine return e.ginEngine
} }
// JobManager will return singleton JobManager from Engine
func (e *Engine) JobManager() *JobManager {
if e.jobManager == nil {
e.jobManager = NewJobManager().SetLogger(e.Logger).SetLogging(e.Config.IsDebug())
}
return e.jobManager
}
// BuildHTTPClient builds HTTP client with provided configuration // BuildHTTPClient builds HTTP client with provided configuration
func (e *Engine) BuildHTTPClient(replaceDefault ...bool) *Engine { func (e *Engine) BuildHTTPClient(replaceDefault ...bool) *Engine {
if e.Config.GetHTTPClientConfig() != nil { if e.Config.GetHTTPClientConfig() != nil {

View File

@ -142,6 +142,17 @@ func (e *EngineTest) Test_Router() {
assert.NotNil(e.T(), e.engine.Router()) assert.NotNil(e.T(), e.engine.Router())
} }
func (e *EngineTest) Test_JobManager() {
defer func() {
require.Nil(e.T(), recover())
}()
require.Nil(e.T(), e.engine.jobManager)
manager := e.engine.JobManager()
require.NotNil(e.T(), manager)
assert.Equal(e.T(), manager, e.engine.JobManager())
}
func (e *EngineTest) Test_ConfigureRouter() { func (e *EngineTest) Test_ConfigureRouter() {
e.engine.TranslationsPath = testTranslationsDir e.engine.TranslationsPath = testTranslationsDir
e.engine.Prepare() e.engine.Prepare()

View File

@ -10,33 +10,97 @@ import (
) )
// JobFunc is empty func which should be executed in a parallel goroutine // JobFunc is empty func which should be executed in a parallel goroutine
type JobFunc func() error type JobFunc func(JobLogFunc) error
// JobLogFunc is a function which logs data from job
type JobLogFunc func(string, logging.Level, ...interface{})
// JobErrorHandler is a function to handle jobs errors. First argument is a job name. // JobErrorHandler is a function to handle jobs errors. First argument is a job name.
type JobErrorHandler func(string, error, *logging.Logger) type JobErrorHandler func(string, error, JobLogFunc)
// JobPanicHandler is a function to handle jobs panics. First argument is a job name. // JobPanicHandler is a function to handle jobs panics. First argument is a job name.
type JobPanicHandler func(string, interface{}, *logging.Logger) type JobPanicHandler func(string, interface{}, JobLogFunc)
// Job represents single job. Regular job will be executed every Interval. // Job represents single job. Regular job will be executed every Interval.
type Job struct { type Job struct {
Command JobFunc Command JobFunc
ErrorHandler JobErrorHandler ErrorHandler JobErrorHandler
PanicHandler JobPanicHandler PanicHandler JobPanicHandler
Regular bool
Interval time.Duration Interval time.Duration
lastExecuted time.Time Regular bool
active bool
stopChannel chan bool
} }
// JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as // JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as
// singleton), or jobs can be executed as regular jobs. Example initialization: // singleton), or jobs can be executed as regular jobs. Example initialization:
// TODO example initialization // TODO example initialization
type JobManager struct { type JobManager struct {
jobs *sync.Map jobs *sync.Map
enableLogging bool enableLogging bool
logger *logging.Logger logger *logging.Logger
executorInterval time.Duration }
executorChannel chan bool
// getWrappedFunc wraps job into function
func (j *Job) getWrappedFunc(name string, log JobLogFunc) func() {
return func() {
defer func() {
if r := recover(); r != nil && j.PanicHandler != nil {
j.PanicHandler(name, r, log)
}
}()
if err := j.Command(log); err != nil && j.ErrorHandler != nil {
j.ErrorHandler(name, err, log)
}
}
}
// getWrappedTimerFunc returns job timer func to run in the separate goroutine
func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func() {
return func() {
for range time.NewTicker(j.Interval).C {
select {
case <-j.stopChannel:
return
default:
j.getWrappedFunc(name, log)()
}
}
}
}
// run job
func (j *Job) run(name string, log JobLogFunc) *Job {
if j.Regular && j.Interval > 0 && !j.active {
j.stopChannel = make(chan bool)
go j.getWrappedTimerFunc(name, log)()
j.active = true
}
return j
}
// stop running job
func (j *Job) stop() *Job {
if j.active && j.stopChannel != nil {
j.stopChannel <- true
j.active = false
}
return j
}
// runOnce run job once
func (j *Job) runOnce(name string, log JobLogFunc) *Job {
go j.getWrappedFunc(name, log)()
return j
}
// runOnceSync run job once in current goroutine
func (j *Job) runOnceSync(name string, log JobLogFunc) *Job {
j.getWrappedFunc(name, log)()
return j
} }
// NewJobManager is a JobManager constructor // NewJobManager is a JobManager constructor
@ -44,33 +108,20 @@ func NewJobManager() *JobManager {
return &JobManager{jobs: &sync.Map{}} return &JobManager{jobs: &sync.Map{}}
} }
// DefaultExecutorInterval is a default recommended interval for main job executor // DefaultJobErrorHandler returns default error handler for a job
func DefaultExecutorInterval() time.Duration { func DefaultJobErrorHandler() JobErrorHandler {
return time.Minute return func(name string, err error, log JobLogFunc) {
} if err != nil && name != "" {
log("Job `%s` errored with an error: `%s`", logging.ERROR, name, err.Error())
// DefaultJobErrorHandler is a default error handler for a job
func DefaultJobErrorHandler(name string, err error, logger *logging.Logger) {
if err != nil && name != "" {
message := fmt.Sprintf("Job `%s` errored with an error: `%s`", name, err.Error())
if logger != nil {
logger.Error(message)
} else {
fmt.Print("[ERROR]", message)
} }
} }
} }
// DefaultJobPanicHandler is a default panic handler for a job // DefaultJobPanicHandler returns default panic handler for a job
func DefaultJobPanicHandler(name string, recoverValue interface{}, logger *logging.Logger) { func DefaultJobPanicHandler() JobPanicHandler {
if recoverValue != nil && name != "" { return func(name string, recoverValue interface{}, log JobLogFunc) {
message := fmt.Sprintf("Job `%s` panicked with value: `%#v`", name, recoverValue) if recoverValue != nil && name != "" {
log("Job `%s` panicked with value: `%#v`", logging.ERROR, name, recoverValue)
if logger != nil {
logger.Error(message)
} else {
fmt.Print("[ERROR]", message)
} }
} }
} }
@ -91,124 +142,135 @@ func (j *JobManager) SetLogging(enableLogging bool) *JobManager {
} }
// RegisterJob registers new job // RegisterJob registers new job
func (j *JobManager) RegisterJob(name string, job Job) error { func (j *JobManager) RegisterJob(name string, job *Job) error {
if i, ok := j.jobs.Load(name); ok { if job == nil {
if _, ok := j.asJob(i); ok { return errors.New("job shouldn't be nil")
return errors.New("job already exists") }
}
if _, ok := j.FetchJob(name); ok {
return errors.New("job already exists")
} }
j.jobs.Store(name, job) j.jobs.Store(name, job)
return nil return nil
} }
// UnregisterJob unregisters job if it's exists. Returns error if job doesn't exist.
func (j *JobManager) UnregisterJob(name string) error {
if i, ok := j.FetchJob(name); ok {
i.stop()
j.jobs.Delete(name)
}
return fmt.Errorf("cannot find job `%s`", name)
}
// FetchJob fetches already exist job // FetchJob fetches already exist job
func (j *JobManager) FetchJob(name string) (value Job, ok bool) { func (j *JobManager) FetchJob(name string) (value *Job, ok bool) {
if i, ok := j.jobs.Load(name); ok { if i, ok := j.jobs.Load(name); ok {
if job, ok := j.asJob(i); ok { if job, ok := j.asJob(i); ok {
return job, ok return job, ok
} }
} }
return Job{}, false return &Job{}, false
} }
// UpdateJob updates job // UpdateJob updates job
func (j *JobManager) UpdateJob(name string, job Job) error { func (j *JobManager) UpdateJob(name string, job *Job) error {
if job, ok := j.FetchJob(name); ok { if job, ok := j.FetchJob(name); ok {
j.jobs.Delete(name) _ = j.UnregisterJob(name)
return j.RegisterJob(name, job) return j.RegisterJob(name, job)
} }
return fmt.Errorf("cannot find job `%s`", name) return fmt.Errorf("cannot find job `%s`", name)
} }
// ExecuteJob executes provided job if it's exists. It's async operation and error returns only of job wasn't executed at all. // RunJob starts provided regular job if it's exists. It's async operation and error returns only of job wasn't executed at all.
func (j *JobManager) ExecuteJob(name string, resetInterval bool) error { func (j *JobManager) RunJob(name string) error {
if job, ok := j.FetchJob(name); ok { if job, ok := j.FetchJob(name); ok {
return j.runJob(name, job, resetInterval) job.run(name, j.log)
return nil
} }
return fmt.Errorf("cannot find job `%s`", name) return fmt.Errorf("cannot find job `%s`", name)
} }
// StartExecutor runs executor // StopJob stops provided regular regular job if it's exists.
func (j *JobManager) StartExecutor(executorInterval time.Duration) error { func (j *JobManager) StopJob(name string) error {
if executorInterval <= 0 { if job, ok := j.FetchJob(name); ok {
return errors.New("executorInterval must be higher that 0") job.stop()
return nil
} }
if j.executorChannel != nil { return fmt.Errorf("cannot find job `%s`", name)
return errors.New("executor is already active")
}
j.executorInterval = executorInterval
j.executorChannel = make(chan bool)
go func(stop chan bool) {
for _ = range time.NewTicker(j.executorInterval).C {
select {
case <-stop:
return
case <-time.After(time.Second):
j.jobs.Range(func(key, value interface{}) bool {
if job, ok := j.asJob(value); ok {
if name, ok := key.(string); ok {
if job.Regular &&
job.lastExecuted.Before(time.Now()) &&
time.Since(job.lastExecuted) >= job.Interval {
if err := j.runJob(name, job, true); err != nil {
j.logError("error while executing job `%s`: %s", name, err.Error())
}
}
}
}
return true
})
}
}
}(j.executorChannel)
return nil
} }
// logError logs error // RunJobOnce starts provided job once if it exists. It's also async.
func (j *JobManager) logError(format string, args ...interface{}) { func (j *JobManager) RunJobOnce(name string) error {
if j.logger != nil { if job, ok := j.FetchJob(name); ok {
j.logger.Errorf(format, args...) job.runOnce(name, j.log)
return nil
} }
fmt.Printf(format, args...) return fmt.Errorf("cannot find job `%s`", name)
}
// RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work.
func (j *JobManager) RunJobOnceSync(name string) error {
if job, ok := j.FetchJob(name); ok {
job.runOnceSync(name, j.log)
return nil
}
return fmt.Errorf("cannot find job `%s`", name)
}
// log logs via logger or as plaintext
func (j *JobManager) log(format string, severity logging.Level, args ...interface{}) {
if !j.enableLogging {
return
}
if j.logger != nil {
switch severity {
case logging.CRITICAL:
j.logger.Criticalf(format, args...)
case logging.ERROR:
j.logger.Errorf(format, args...)
case logging.WARNING:
j.logger.Warningf(format, args...)
case logging.NOTICE:
j.logger.Noticef(format, args...)
case logging.INFO:
j.logger.Infof(format, args...)
case logging.DEBUG:
j.logger.Debugf(format, args...)
}
}
switch severity {
case logging.CRITICAL:
fmt.Print("[CRITICAL] ", fmt.Sprintf(format, args...))
case logging.ERROR:
fmt.Print("[ERROR] ", fmt.Sprintf(format, args...))
case logging.WARNING:
fmt.Print("[WARNING] ", fmt.Sprintf(format, args...))
case logging.NOTICE:
fmt.Print("[NOTICE] ", fmt.Sprintf(format, args...))
case logging.INFO:
fmt.Print("[INFO] ", fmt.Sprintf(format, args...))
case logging.DEBUG:
fmt.Print("[DEBUG] ", fmt.Sprintf(format, args...))
}
} }
// asJob casts interface to a Job // asJob casts interface to a Job
func (j *JobManager) asJob(v interface{}) (Job, bool) { func (j *JobManager) asJob(v interface{}) (*Job, bool) {
if job, ok := v.(Job); ok { if job, ok := v.(*Job); ok {
return job, ok return job, ok
} }
return Job{}, false return &Job{}, false
}
// runJob executes provided job from object. It's async operation and error returns only of job wasn't executed at all.
func (j *JobManager) runJob(name string, job Job, resetInterval bool) error {
go func() {
defer func() {
if r := recover(); r != nil && job.PanicHandler != nil {
job.PanicHandler(name, r, j.logger)
}
}()
if err := job.Command(); err != nil && job.ErrorHandler != nil {
job.ErrorHandler(name, err, j.logger)
}
}()
if resetInterval {
job.lastExecuted = time.Now()
return j.UpdateJob(name, job)
}
return fmt.Errorf("cannot find job `%s`", name)
} }

279
core/job_manager_test.go Normal file
View File

@ -0,0 +1,279 @@
package core
import (
"errors"
"fmt"
"testing"
"time"
"github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type JobTest struct {
suite.Suite
job *Job
executed bool
executeErr error
lastLog string
lastMsgLevel logging.Level
panicValue interface{}
}
type JobManagerTest struct {
suite.Suite
manager *JobManager
runnerFlag bool
}
func TestJob(t *testing.T) {
suite.Run(t, new(JobTest))
}
func TestJobManager(t *testing.T) {
suite.Run(t, new(JobManagerTest))
}
func TestDefaultJobErrorHandler(t *testing.T) {
defer func() {
require.Nil(t, recover())
}()
fn := DefaultJobErrorHandler()
require.NotNil(t, fn)
fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) {
require.Len(t, i, 2)
assert.Equal(t, fmt.Sprintf("%s", i[1]), "test")
})
}
func TestDefaultJobPanicHandler(t *testing.T) {
defer func() {
require.Nil(t, recover())
}()
fn := DefaultJobPanicHandler()
require.NotNil(t, fn)
fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) {
require.Len(t, i, 2)
assert.Equal(t, fmt.Sprintf("%s", i[1]), "test")
})
}
func (t *JobTest) testErrorHandler() JobErrorHandler {
return func(name string, err error, logFunc JobLogFunc) {
t.executeErr = err
}
}
func (t *JobTest) testPanicHandler() JobPanicHandler {
return func(name string, i interface{}, logFunc JobLogFunc) {
t.panicValue = i
}
}
func (t *JobTest) testLogFunc() JobLogFunc {
return func(s string, level logging.Level, i ...interface{}) {
t.lastLog = fmt.Sprintf(s, i...)
t.lastMsgLevel = level
}
}
func (t *JobTest) errored() bool {
return t.executeErr != nil
}
func (t *JobTest) panicked() bool {
return t.panicValue != nil
}
func (t *JobTest) clear() {
if t.job != nil {
t.job.stop()
t.job = nil
}
t.executed = false
t.executeErr = nil
t.panicValue = nil
}
func (t *JobTest) onceJob() {
t.clear()
t.job = &Job{
Command: func(logFunc JobLogFunc) error {
t.executed = true
return nil
},
ErrorHandler: t.testErrorHandler(),
PanicHandler: t.testPanicHandler(),
Interval: 0,
Regular: false,
}
}
func (t *JobTest) onceErrorJob() {
t.clear()
t.job = &Job{
Command: func(logFunc JobLogFunc) error {
t.executed = true
return errors.New("test error")
},
ErrorHandler: t.testErrorHandler(),
PanicHandler: t.testPanicHandler(),
Interval: 0,
Regular: false,
}
}
func (t *JobTest) oncePanicJob() {
t.clear()
t.job = &Job{
Command: func(logFunc JobLogFunc) error {
t.executed = true
panic("test panic")
},
ErrorHandler: t.testErrorHandler(),
PanicHandler: t.testPanicHandler(),
Interval: 0,
Regular: false,
}
}
func (t *JobTest) regularJob() {
t.clear()
t.job = &Job{
Command: func(logFunc JobLogFunc) error {
t.executed = true
return nil
},
ErrorHandler: t.testErrorHandler(),
PanicHandler: t.testPanicHandler(),
Interval: time.Nanosecond,
Regular: true,
}
}
func (t *JobTest) Test_getWrappedFunc() {
defer func() {
require.Nil(t.T(), recover())
}()
t.clear()
t.onceJob()
fn := t.job.getWrappedFunc("job", t.testLogFunc())
require.NotNil(t.T(), fn)
fn()
assert.True(t.T(), t.executed)
assert.False(t.T(), t.errored())
assert.False(t.T(), t.panicked())
}
func (t *JobTest) Test_getWrappedFuncError() {
defer func() {
require.Nil(t.T(), recover())
}()
t.clear()
t.onceErrorJob()
fn := t.job.getWrappedFunc("job", t.testLogFunc())
require.NotNil(t.T(), fn)
fn()
assert.True(t.T(), t.executed)
assert.True(t.T(), t.errored())
assert.False(t.T(), t.panicked())
}
func (t *JobTest) Test_getWrappedFuncPanic() {
defer func() {
require.Nil(t.T(), recover())
}()
t.clear()
t.oncePanicJob()
fn := t.job.getWrappedFunc("job", t.testLogFunc())
require.NotNil(t.T(), fn)
fn()
assert.True(t.T(), t.executed)
assert.False(t.T(), t.errored())
assert.True(t.T(), t.panicked())
}
func (t *JobTest) Test_getWrappedTimerFunc() {
defer func() {
require.Nil(t.T(), recover())
}()
t.clear()
t.regularJob()
t.job.run("job", t.testLogFunc())
time.Sleep(time.Millisecond)
assert.True(t.T(), t.executed)
t.executed = false
time.Sleep(time.Millisecond)
if !t.executed {
t.clear()
t.T().Skip("job wasn't as fast as it should be! this may be an error, but also can be just bad timing")
}
t.job.stop()
time.Sleep(time.Nanosecond * 10)
t.clear()
assert.False(t.T(), t.executed)
}
func (t *JobManagerTest) SetupSuite() {
t.manager = NewJobManager()
}
func (t *JobManagerTest) Test_SetLogger() {
t.manager.logger = nil
t.manager.SetLogger(NewLogger("test", logging.ERROR, DefaultLogFormatter()))
assert.IsType(t.T(), &logging.Logger{}, t.manager.logger)
t.manager.SetLogger(nil)
assert.IsType(t.T(), &logging.Logger{}, t.manager.logger)
}
func (t *JobManagerTest) Test_SetLogging() {
t.manager.enableLogging = false
t.manager.SetLogging(true)
assert.True(t.T(), t.manager.enableLogging)
t.manager.SetLogging(false)
assert.False(t.T(), t.manager.enableLogging)
}
func (t *JobManagerTest) Test_RegisterJobNil() {
require.NotNil(t.T(), t.manager.jobs)
err := t.manager.RegisterJob("job", nil)
assert.EqualError(t.T(), err, "job shouldn't be nil")
}
func (t *JobManagerTest) Test_RegisterJob() {
require.NotNil(t.T(), t.manager.jobs)
err := t.manager.RegisterJob("job", &Job{
Command: func(log JobLogFunc) error {
t.runnerFlag = true
return nil
},
ErrorHandler: DefaultJobErrorHandler(),
PanicHandler: DefaultJobPanicHandler(),
})
assert.NoError(t.T(), err)
}
func (t *JobManagerTest) Test_RegisterJobAlreadyExists() {
require.NotNil(t.T(), t.manager.jobs)
err := t.manager.RegisterJob("job", &Job{})
assert.EqualError(t.T(), err, "job already exists")
}
func (t *JobManagerTest) Test_RunOnceSync() {
require.NotNil(t.T(), t.manager.jobs)
t.runnerFlag = false
err := t.manager.RunJobOnceSync("job")
require.NoError(t.T(), err)
assert.True(t.T(), t.runnerFlag)
}