diff --git a/core/engine.go b/core/engine.go index 7d5d16d..2de7c2f 100644 --- a/core/engine.go +++ b/core/engine.go @@ -21,6 +21,7 @@ type Engine struct { httpClient *http.Client Logger *logging.Logger csrf *CSRF + jobManager *JobManager Sessions sessions.Store Config ConfigInterface LogFormatter logging.Formatter @@ -134,6 +135,15 @@ func (e *Engine) Router() *gin.Engine { return e.ginEngine } +// JobManager will return singleton JobManager from Engine +func (e *Engine) JobManager() *JobManager { + if e.jobManager == nil { + e.jobManager = NewJobManager().SetLogger(e.Logger).SetLogging(e.Config.IsDebug()) + } + + return e.jobManager +} + // BuildHTTPClient builds HTTP client with provided configuration func (e *Engine) BuildHTTPClient(replaceDefault ...bool) *Engine { if e.Config.GetHTTPClientConfig() != nil { diff --git a/core/engine_test.go b/core/engine_test.go index 422b1f7..c9f9f18 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -142,6 +142,17 @@ func (e *EngineTest) Test_Router() { assert.NotNil(e.T(), e.engine.Router()) } +func (e *EngineTest) Test_JobManager() { + defer func() { + require.Nil(e.T(), recover()) + }() + + require.Nil(e.T(), e.engine.jobManager) + manager := e.engine.JobManager() + require.NotNil(e.T(), manager) + assert.Equal(e.T(), manager, e.engine.JobManager()) +} + func (e *EngineTest) Test_ConfigureRouter() { e.engine.TranslationsPath = testTranslationsDir e.engine.Prepare() diff --git a/core/job_manager.go b/core/job_manager.go index e5bc1c2..0b60d18 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -10,33 +10,97 @@ import ( ) // JobFunc is empty func which should be executed in a parallel goroutine -type JobFunc func() error +type JobFunc func(JobLogFunc) error + +// JobLogFunc is a function which logs data from job +type JobLogFunc func(string, logging.Level, ...interface{}) // JobErrorHandler is a function to handle jobs errors. First argument is a job name. -type JobErrorHandler func(string, error, *logging.Logger) +type JobErrorHandler func(string, error, JobLogFunc) // JobPanicHandler is a function to handle jobs panics. First argument is a job name. -type JobPanicHandler func(string, interface{}, *logging.Logger) +type JobPanicHandler func(string, interface{}, JobLogFunc) // Job represents single job. Regular job will be executed every Interval. type Job struct { Command JobFunc ErrorHandler JobErrorHandler PanicHandler JobPanicHandler - Regular bool Interval time.Duration - lastExecuted time.Time + Regular bool + active bool + stopChannel chan bool } // JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as // singleton), or jobs can be executed as regular jobs. Example initialization: // TODO example initialization type JobManager struct { - jobs *sync.Map - enableLogging bool - logger *logging.Logger - executorInterval time.Duration - executorChannel chan bool + jobs *sync.Map + enableLogging bool + logger *logging.Logger +} + +// getWrappedFunc wraps job into function +func (j *Job) getWrappedFunc(name string, log JobLogFunc) func() { + return func() { + defer func() { + if r := recover(); r != nil && j.PanicHandler != nil { + j.PanicHandler(name, r, log) + } + }() + + if err := j.Command(log); err != nil && j.ErrorHandler != nil { + j.ErrorHandler(name, err, log) + } + } +} + +// getWrappedTimerFunc returns job timer func to run in the separate goroutine +func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func() { + return func() { + for range time.NewTicker(j.Interval).C { + select { + case <-j.stopChannel: + return + default: + j.getWrappedFunc(name, log)() + } + } + } +} + +// run job +func (j *Job) run(name string, log JobLogFunc) *Job { + if j.Regular && j.Interval > 0 && !j.active { + j.stopChannel = make(chan bool) + go j.getWrappedTimerFunc(name, log)() + j.active = true + } + + return j +} + +// stop running job +func (j *Job) stop() *Job { + if j.active && j.stopChannel != nil { + j.stopChannel <- true + j.active = false + } + + return j +} + +// runOnce run job once +func (j *Job) runOnce(name string, log JobLogFunc) *Job { + go j.getWrappedFunc(name, log)() + return j +} + +// runOnceSync run job once in current goroutine +func (j *Job) runOnceSync(name string, log JobLogFunc) *Job { + j.getWrappedFunc(name, log)() + return j } // NewJobManager is a JobManager constructor @@ -44,33 +108,20 @@ func NewJobManager() *JobManager { return &JobManager{jobs: &sync.Map{}} } -// DefaultExecutorInterval is a default recommended interval for main job executor -func DefaultExecutorInterval() time.Duration { - return time.Minute -} - -// DefaultJobErrorHandler is a default error handler for a job -func DefaultJobErrorHandler(name string, err error, logger *logging.Logger) { - if err != nil && name != "" { - message := fmt.Sprintf("Job `%s` errored with an error: `%s`", name, err.Error()) - - if logger != nil { - logger.Error(message) - } else { - fmt.Print("[ERROR]", message) +// DefaultJobErrorHandler returns default error handler for a job +func DefaultJobErrorHandler() JobErrorHandler { + return func(name string, err error, log JobLogFunc) { + if err != nil && name != "" { + log("Job `%s` errored with an error: `%s`", logging.ERROR, name, err.Error()) } } } -// DefaultJobPanicHandler is a default panic handler for a job -func DefaultJobPanicHandler(name string, recoverValue interface{}, logger *logging.Logger) { - if recoverValue != nil && name != "" { - message := fmt.Sprintf("Job `%s` panicked with value: `%#v`", name, recoverValue) - - if logger != nil { - logger.Error(message) - } else { - fmt.Print("[ERROR]", message) +// DefaultJobPanicHandler returns default panic handler for a job +func DefaultJobPanicHandler() JobPanicHandler { + return func(name string, recoverValue interface{}, log JobLogFunc) { + if recoverValue != nil && name != "" { + log("Job `%s` panicked with value: `%#v`", logging.ERROR, name, recoverValue) } } } @@ -91,124 +142,135 @@ func (j *JobManager) SetLogging(enableLogging bool) *JobManager { } // RegisterJob registers new job -func (j *JobManager) RegisterJob(name string, job Job) error { - if i, ok := j.jobs.Load(name); ok { - if _, ok := j.asJob(i); ok { - return errors.New("job already exists") - } +func (j *JobManager) RegisterJob(name string, job *Job) error { + if job == nil { + return errors.New("job shouldn't be nil") + } + + if _, ok := j.FetchJob(name); ok { + return errors.New("job already exists") } j.jobs.Store(name, job) + return nil } +// UnregisterJob unregisters job if it's exists. Returns error if job doesn't exist. +func (j *JobManager) UnregisterJob(name string) error { + if i, ok := j.FetchJob(name); ok { + i.stop() + j.jobs.Delete(name) + } + + return fmt.Errorf("cannot find job `%s`", name) +} + // FetchJob fetches already exist job -func (j *JobManager) FetchJob(name string) (value Job, ok bool) { +func (j *JobManager) FetchJob(name string) (value *Job, ok bool) { if i, ok := j.jobs.Load(name); ok { if job, ok := j.asJob(i); ok { return job, ok } } - return Job{}, false + return &Job{}, false } // UpdateJob updates job -func (j *JobManager) UpdateJob(name string, job Job) error { +func (j *JobManager) UpdateJob(name string, job *Job) error { if job, ok := j.FetchJob(name); ok { - j.jobs.Delete(name) + _ = j.UnregisterJob(name) return j.RegisterJob(name, job) } return fmt.Errorf("cannot find job `%s`", name) } -// ExecuteJob executes provided job if it's exists. It's async operation and error returns only of job wasn't executed at all. -func (j *JobManager) ExecuteJob(name string, resetInterval bool) error { +// RunJob starts provided regular job if it's exists. It's async operation and error returns only of job wasn't executed at all. +func (j *JobManager) RunJob(name string) error { if job, ok := j.FetchJob(name); ok { - return j.runJob(name, job, resetInterval) + job.run(name, j.log) + return nil } return fmt.Errorf("cannot find job `%s`", name) } -// StartExecutor runs executor -func (j *JobManager) StartExecutor(executorInterval time.Duration) error { - if executorInterval <= 0 { - return errors.New("executorInterval must be higher that 0") +// StopJob stops provided regular regular job if it's exists. +func (j *JobManager) StopJob(name string) error { + if job, ok := j.FetchJob(name); ok { + job.stop() + return nil } - if j.executorChannel != nil { - return errors.New("executor is already active") - } - - j.executorInterval = executorInterval - j.executorChannel = make(chan bool) - - go func(stop chan bool) { - for _ = range time.NewTicker(j.executorInterval).C { - select { - case <-stop: - return - case <-time.After(time.Second): - j.jobs.Range(func(key, value interface{}) bool { - if job, ok := j.asJob(value); ok { - if name, ok := key.(string); ok { - if job.Regular && - job.lastExecuted.Before(time.Now()) && - time.Since(job.lastExecuted) >= job.Interval { - if err := j.runJob(name, job, true); err != nil { - j.logError("error while executing job `%s`: %s", name, err.Error()) - } - } - } - } - - return true - }) - } - } - }(j.executorChannel) - - return nil + return fmt.Errorf("cannot find job `%s`", name) } -// logError logs error -func (j *JobManager) logError(format string, args ...interface{}) { - if j.logger != nil { - j.logger.Errorf(format, args...) +// RunJobOnce starts provided job once if it exists. It's also async. +func (j *JobManager) RunJobOnce(name string) error { + if job, ok := j.FetchJob(name); ok { + job.runOnce(name, j.log) + return nil } - fmt.Printf(format, args...) + return fmt.Errorf("cannot find job `%s`", name) +} + +// RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work. +func (j *JobManager) RunJobOnceSync(name string) error { + if job, ok := j.FetchJob(name); ok { + job.runOnceSync(name, j.log) + return nil + } + + return fmt.Errorf("cannot find job `%s`", name) +} + +// log logs via logger or as plaintext +func (j *JobManager) log(format string, severity logging.Level, args ...interface{}) { + if !j.enableLogging { + return + } + + if j.logger != nil { + switch severity { + case logging.CRITICAL: + j.logger.Criticalf(format, args...) + case logging.ERROR: + j.logger.Errorf(format, args...) + case logging.WARNING: + j.logger.Warningf(format, args...) + case logging.NOTICE: + j.logger.Noticef(format, args...) + case logging.INFO: + j.logger.Infof(format, args...) + case logging.DEBUG: + j.logger.Debugf(format, args...) + } + } + + switch severity { + case logging.CRITICAL: + fmt.Print("[CRITICAL] ", fmt.Sprintf(format, args...)) + case logging.ERROR: + fmt.Print("[ERROR] ", fmt.Sprintf(format, args...)) + case logging.WARNING: + fmt.Print("[WARNING] ", fmt.Sprintf(format, args...)) + case logging.NOTICE: + fmt.Print("[NOTICE] ", fmt.Sprintf(format, args...)) + case logging.INFO: + fmt.Print("[INFO] ", fmt.Sprintf(format, args...)) + case logging.DEBUG: + fmt.Print("[DEBUG] ", fmt.Sprintf(format, args...)) + } } // asJob casts interface to a Job -func (j *JobManager) asJob(v interface{}) (Job, bool) { - if job, ok := v.(Job); ok { +func (j *JobManager) asJob(v interface{}) (*Job, bool) { + if job, ok := v.(*Job); ok { return job, ok } - return Job{}, false -} - -// runJob executes provided job from object. It's async operation and error returns only of job wasn't executed at all. -func (j *JobManager) runJob(name string, job Job, resetInterval bool) error { - go func() { - defer func() { - if r := recover(); r != nil && job.PanicHandler != nil { - job.PanicHandler(name, r, j.logger) - } - }() - - if err := job.Command(); err != nil && job.ErrorHandler != nil { - job.ErrorHandler(name, err, j.logger) - } - }() - - if resetInterval { - job.lastExecuted = time.Now() - return j.UpdateJob(name, job) - } - - return fmt.Errorf("cannot find job `%s`", name) + return &Job{}, false } diff --git a/core/job_manager_test.go b/core/job_manager_test.go new file mode 100644 index 0000000..66cd429 --- /dev/null +++ b/core/job_manager_test.go @@ -0,0 +1,279 @@ +package core + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/op/go-logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type JobTest struct { + suite.Suite + job *Job + executed bool + executeErr error + lastLog string + lastMsgLevel logging.Level + panicValue interface{} +} + +type JobManagerTest struct { + suite.Suite + manager *JobManager + runnerFlag bool +} + +func TestJob(t *testing.T) { + suite.Run(t, new(JobTest)) +} + +func TestJobManager(t *testing.T) { + suite.Run(t, new(JobManagerTest)) +} + +func TestDefaultJobErrorHandler(t *testing.T) { + defer func() { + require.Nil(t, recover()) + }() + + fn := DefaultJobErrorHandler() + require.NotNil(t, fn) + fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) { + require.Len(t, i, 2) + assert.Equal(t, fmt.Sprintf("%s", i[1]), "test") + }) +} + +func TestDefaultJobPanicHandler(t *testing.T) { + defer func() { + require.Nil(t, recover()) + }() + + fn := DefaultJobPanicHandler() + require.NotNil(t, fn) + fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) { + require.Len(t, i, 2) + assert.Equal(t, fmt.Sprintf("%s", i[1]), "test") + }) +} + +func (t *JobTest) testErrorHandler() JobErrorHandler { + return func(name string, err error, logFunc JobLogFunc) { + t.executeErr = err + } +} + +func (t *JobTest) testPanicHandler() JobPanicHandler { + return func(name string, i interface{}, logFunc JobLogFunc) { + t.panicValue = i + } +} + +func (t *JobTest) testLogFunc() JobLogFunc { + return func(s string, level logging.Level, i ...interface{}) { + t.lastLog = fmt.Sprintf(s, i...) + t.lastMsgLevel = level + } +} + +func (t *JobTest) errored() bool { + return t.executeErr != nil +} + +func (t *JobTest) panicked() bool { + return t.panicValue != nil +} + +func (t *JobTest) clear() { + if t.job != nil { + t.job.stop() + t.job = nil + } + + t.executed = false + t.executeErr = nil + t.panicValue = nil +} + +func (t *JobTest) onceJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + return nil + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: 0, + Regular: false, + } +} + +func (t *JobTest) onceErrorJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + return errors.New("test error") + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: 0, + Regular: false, + } +} + +func (t *JobTest) oncePanicJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + panic("test panic") + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: 0, + Regular: false, + } +} + +func (t *JobTest) regularJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + return nil + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: time.Nanosecond, + Regular: true, + } +} + +func (t *JobTest) Test_getWrappedFunc() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.onceJob() + fn := t.job.getWrappedFunc("job", t.testLogFunc()) + require.NotNil(t.T(), fn) + fn() + assert.True(t.T(), t.executed) + assert.False(t.T(), t.errored()) + assert.False(t.T(), t.panicked()) +} + +func (t *JobTest) Test_getWrappedFuncError() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.onceErrorJob() + fn := t.job.getWrappedFunc("job", t.testLogFunc()) + require.NotNil(t.T(), fn) + fn() + assert.True(t.T(), t.executed) + assert.True(t.T(), t.errored()) + assert.False(t.T(), t.panicked()) +} + +func (t *JobTest) Test_getWrappedFuncPanic() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.oncePanicJob() + fn := t.job.getWrappedFunc("job", t.testLogFunc()) + require.NotNil(t.T(), fn) + fn() + assert.True(t.T(), t.executed) + assert.False(t.T(), t.errored()) + assert.True(t.T(), t.panicked()) +} + +func (t *JobTest) Test_getWrappedTimerFunc() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.regularJob() + t.job.run("job", t.testLogFunc()) + time.Sleep(time.Millisecond) + assert.True(t.T(), t.executed) + t.executed = false + time.Sleep(time.Millisecond) + if !t.executed { + t.clear() + t.T().Skip("job wasn't as fast as it should be! this may be an error, but also can be just bad timing") + } + t.job.stop() + time.Sleep(time.Nanosecond * 10) + t.clear() + assert.False(t.T(), t.executed) +} + +func (t *JobManagerTest) SetupSuite() { + t.manager = NewJobManager() +} + +func (t *JobManagerTest) Test_SetLogger() { + t.manager.logger = nil + t.manager.SetLogger(NewLogger("test", logging.ERROR, DefaultLogFormatter())) + assert.IsType(t.T(), &logging.Logger{}, t.manager.logger) + + t.manager.SetLogger(nil) + assert.IsType(t.T(), &logging.Logger{}, t.manager.logger) +} + +func (t *JobManagerTest) Test_SetLogging() { + t.manager.enableLogging = false + t.manager.SetLogging(true) + assert.True(t.T(), t.manager.enableLogging) + + t.manager.SetLogging(false) + assert.False(t.T(), t.manager.enableLogging) +} + +func (t *JobManagerTest) Test_RegisterJobNil() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RegisterJob("job", nil) + assert.EqualError(t.T(), err, "job shouldn't be nil") +} + +func (t *JobManagerTest) Test_RegisterJob() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RegisterJob("job", &Job{ + Command: func(log JobLogFunc) error { + t.runnerFlag = true + return nil + }, + ErrorHandler: DefaultJobErrorHandler(), + PanicHandler: DefaultJobPanicHandler(), + }) + assert.NoError(t.T(), err) +} + +func (t *JobManagerTest) Test_RegisterJobAlreadyExists() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RegisterJob("job", &Job{}) + assert.EqualError(t.T(), err, "job already exists") +} + +func (t *JobManagerTest) Test_RunOnceSync() { + require.NotNil(t.T(), t.manager.jobs) + t.runnerFlag = false + err := t.manager.RunJobOnceSync("job") + require.NoError(t.T(), err) + assert.True(t.T(), t.runnerFlag) +}