From c191d2c8e0ac6bc0c93b91bc10ef88853f750d93 Mon Sep 17 00:00:00 2001 From: Pavel Date: Mon, 16 Dec 2019 17:24:40 +0300 Subject: [PATCH 1/8] job manager draft --- core/job_manager.go | 214 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 core/job_manager.go diff --git a/core/job_manager.go b/core/job_manager.go new file mode 100644 index 0000000..e5bc1c2 --- /dev/null +++ b/core/job_manager.go @@ -0,0 +1,214 @@ +package core + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/op/go-logging" +) + +// JobFunc is empty func which should be executed in a parallel goroutine +type JobFunc func() error + +// JobErrorHandler is a function to handle jobs errors. First argument is a job name. +type JobErrorHandler func(string, error, *logging.Logger) + +// JobPanicHandler is a function to handle jobs panics. First argument is a job name. +type JobPanicHandler func(string, interface{}, *logging.Logger) + +// Job represents single job. Regular job will be executed every Interval. +type Job struct { + Command JobFunc + ErrorHandler JobErrorHandler + PanicHandler JobPanicHandler + Regular bool + Interval time.Duration + lastExecuted time.Time +} + +// JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as +// singleton), or jobs can be executed as regular jobs. Example initialization: +// TODO example initialization +type JobManager struct { + jobs *sync.Map + enableLogging bool + logger *logging.Logger + executorInterval time.Duration + executorChannel chan bool +} + +// NewJobManager is a JobManager constructor +func NewJobManager() *JobManager { + return &JobManager{jobs: &sync.Map{}} +} + +// DefaultExecutorInterval is a default recommended interval for main job executor +func DefaultExecutorInterval() time.Duration { + return time.Minute +} + +// DefaultJobErrorHandler is a default error handler for a job +func DefaultJobErrorHandler(name string, err error, logger *logging.Logger) { + if err != nil && name != "" { + message := fmt.Sprintf("Job `%s` errored with an error: `%s`", name, err.Error()) + + if logger != nil { + logger.Error(message) + } else { + fmt.Print("[ERROR]", message) + } + } +} + +// DefaultJobPanicHandler is a default panic handler for a job +func DefaultJobPanicHandler(name string, recoverValue interface{}, logger *logging.Logger) { + if recoverValue != nil && name != "" { + message := fmt.Sprintf("Job `%s` panicked with value: `%#v`", name, recoverValue) + + if logger != nil { + logger.Error(message) + } else { + fmt.Print("[ERROR]", message) + } + } +} + +// SetLogger sets logger into JobManager +func (j *JobManager) SetLogger(logger *logging.Logger) *JobManager { + if logger != nil { + j.logger = logger + } + + return j +} + +// SetLogging enables or disables JobManager logging +func (j *JobManager) SetLogging(enableLogging bool) *JobManager { + j.enableLogging = enableLogging + return j +} + +// RegisterJob registers new job +func (j *JobManager) RegisterJob(name string, job Job) error { + if i, ok := j.jobs.Load(name); ok { + if _, ok := j.asJob(i); ok { + return errors.New("job already exists") + } + } + + j.jobs.Store(name, job) + return nil +} + +// FetchJob fetches already exist job +func (j *JobManager) FetchJob(name string) (value Job, ok bool) { + if i, ok := j.jobs.Load(name); ok { + if job, ok := j.asJob(i); ok { + return job, ok + } + } + + return Job{}, false +} + +// UpdateJob updates job +func (j *JobManager) UpdateJob(name string, job Job) error { + if job, ok := j.FetchJob(name); ok { + j.jobs.Delete(name) + return j.RegisterJob(name, job) + } + + return fmt.Errorf("cannot find job `%s`", name) +} + +// ExecuteJob executes provided job if it's exists. It's async operation and error returns only of job wasn't executed at all. +func (j *JobManager) ExecuteJob(name string, resetInterval bool) error { + if job, ok := j.FetchJob(name); ok { + return j.runJob(name, job, resetInterval) + } + + return fmt.Errorf("cannot find job `%s`", name) +} + +// StartExecutor runs executor +func (j *JobManager) StartExecutor(executorInterval time.Duration) error { + if executorInterval <= 0 { + return errors.New("executorInterval must be higher that 0") + } + + if j.executorChannel != nil { + return errors.New("executor is already active") + } + + j.executorInterval = executorInterval + j.executorChannel = make(chan bool) + + go func(stop chan bool) { + for _ = range time.NewTicker(j.executorInterval).C { + select { + case <-stop: + return + case <-time.After(time.Second): + j.jobs.Range(func(key, value interface{}) bool { + if job, ok := j.asJob(value); ok { + if name, ok := key.(string); ok { + if job.Regular && + job.lastExecuted.Before(time.Now()) && + time.Since(job.lastExecuted) >= job.Interval { + if err := j.runJob(name, job, true); err != nil { + j.logError("error while executing job `%s`: %s", name, err.Error()) + } + } + } + } + + return true + }) + } + } + }(j.executorChannel) + + return nil +} + +// logError logs error +func (j *JobManager) logError(format string, args ...interface{}) { + if j.logger != nil { + j.logger.Errorf(format, args...) + } + + fmt.Printf(format, args...) +} + +// asJob casts interface to a Job +func (j *JobManager) asJob(v interface{}) (Job, bool) { + if job, ok := v.(Job); ok { + return job, ok + } + + return Job{}, false +} + +// runJob executes provided job from object. It's async operation and error returns only of job wasn't executed at all. +func (j *JobManager) runJob(name string, job Job, resetInterval bool) error { + go func() { + defer func() { + if r := recover(); r != nil && job.PanicHandler != nil { + job.PanicHandler(name, r, j.logger) + } + }() + + if err := job.Command(); err != nil && job.ErrorHandler != nil { + job.ErrorHandler(name, err, j.logger) + } + }() + + if resetInterval { + job.lastExecuted = time.Now() + return j.UpdateJob(name, job) + } + + return fmt.Errorf("cannot find job `%s`", name) +} From ee1de01b31bb77b3e3caa79b6703eaec1b4bcc35 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 10:39:48 +0300 Subject: [PATCH 2/8] changes for JobManager internal logic, partial tests --- core/engine.go | 10 ++ core/engine_test.go | 11 ++ core/job_manager.go | 286 ++++++++++++++++++++++++--------------- core/job_manager_test.go | 279 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 474 insertions(+), 112 deletions(-) create mode 100644 core/job_manager_test.go diff --git a/core/engine.go b/core/engine.go index 7d5d16d..2de7c2f 100644 --- a/core/engine.go +++ b/core/engine.go @@ -21,6 +21,7 @@ type Engine struct { httpClient *http.Client Logger *logging.Logger csrf *CSRF + jobManager *JobManager Sessions sessions.Store Config ConfigInterface LogFormatter logging.Formatter @@ -134,6 +135,15 @@ func (e *Engine) Router() *gin.Engine { return e.ginEngine } +// JobManager will return singleton JobManager from Engine +func (e *Engine) JobManager() *JobManager { + if e.jobManager == nil { + e.jobManager = NewJobManager().SetLogger(e.Logger).SetLogging(e.Config.IsDebug()) + } + + return e.jobManager +} + // BuildHTTPClient builds HTTP client with provided configuration func (e *Engine) BuildHTTPClient(replaceDefault ...bool) *Engine { if e.Config.GetHTTPClientConfig() != nil { diff --git a/core/engine_test.go b/core/engine_test.go index 422b1f7..c9f9f18 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -142,6 +142,17 @@ func (e *EngineTest) Test_Router() { assert.NotNil(e.T(), e.engine.Router()) } +func (e *EngineTest) Test_JobManager() { + defer func() { + require.Nil(e.T(), recover()) + }() + + require.Nil(e.T(), e.engine.jobManager) + manager := e.engine.JobManager() + require.NotNil(e.T(), manager) + assert.Equal(e.T(), manager, e.engine.JobManager()) +} + func (e *EngineTest) Test_ConfigureRouter() { e.engine.TranslationsPath = testTranslationsDir e.engine.Prepare() diff --git a/core/job_manager.go b/core/job_manager.go index e5bc1c2..0b60d18 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -10,33 +10,97 @@ import ( ) // JobFunc is empty func which should be executed in a parallel goroutine -type JobFunc func() error +type JobFunc func(JobLogFunc) error + +// JobLogFunc is a function which logs data from job +type JobLogFunc func(string, logging.Level, ...interface{}) // JobErrorHandler is a function to handle jobs errors. First argument is a job name. -type JobErrorHandler func(string, error, *logging.Logger) +type JobErrorHandler func(string, error, JobLogFunc) // JobPanicHandler is a function to handle jobs panics. First argument is a job name. -type JobPanicHandler func(string, interface{}, *logging.Logger) +type JobPanicHandler func(string, interface{}, JobLogFunc) // Job represents single job. Regular job will be executed every Interval. type Job struct { Command JobFunc ErrorHandler JobErrorHandler PanicHandler JobPanicHandler - Regular bool Interval time.Duration - lastExecuted time.Time + Regular bool + active bool + stopChannel chan bool } // JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as // singleton), or jobs can be executed as regular jobs. Example initialization: // TODO example initialization type JobManager struct { - jobs *sync.Map - enableLogging bool - logger *logging.Logger - executorInterval time.Duration - executorChannel chan bool + jobs *sync.Map + enableLogging bool + logger *logging.Logger +} + +// getWrappedFunc wraps job into function +func (j *Job) getWrappedFunc(name string, log JobLogFunc) func() { + return func() { + defer func() { + if r := recover(); r != nil && j.PanicHandler != nil { + j.PanicHandler(name, r, log) + } + }() + + if err := j.Command(log); err != nil && j.ErrorHandler != nil { + j.ErrorHandler(name, err, log) + } + } +} + +// getWrappedTimerFunc returns job timer func to run in the separate goroutine +func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func() { + return func() { + for range time.NewTicker(j.Interval).C { + select { + case <-j.stopChannel: + return + default: + j.getWrappedFunc(name, log)() + } + } + } +} + +// run job +func (j *Job) run(name string, log JobLogFunc) *Job { + if j.Regular && j.Interval > 0 && !j.active { + j.stopChannel = make(chan bool) + go j.getWrappedTimerFunc(name, log)() + j.active = true + } + + return j +} + +// stop running job +func (j *Job) stop() *Job { + if j.active && j.stopChannel != nil { + j.stopChannel <- true + j.active = false + } + + return j +} + +// runOnce run job once +func (j *Job) runOnce(name string, log JobLogFunc) *Job { + go j.getWrappedFunc(name, log)() + return j +} + +// runOnceSync run job once in current goroutine +func (j *Job) runOnceSync(name string, log JobLogFunc) *Job { + j.getWrappedFunc(name, log)() + return j } // NewJobManager is a JobManager constructor @@ -44,33 +108,20 @@ func NewJobManager() *JobManager { return &JobManager{jobs: &sync.Map{}} } -// DefaultExecutorInterval is a default recommended interval for main job executor -func DefaultExecutorInterval() time.Duration { - return time.Minute -} - -// DefaultJobErrorHandler is a default error handler for a job -func DefaultJobErrorHandler(name string, err error, logger *logging.Logger) { - if err != nil && name != "" { - message := fmt.Sprintf("Job `%s` errored with an error: `%s`", name, err.Error()) - - if logger != nil { - logger.Error(message) - } else { - fmt.Print("[ERROR]", message) +// DefaultJobErrorHandler returns default error handler for a job +func DefaultJobErrorHandler() JobErrorHandler { + return func(name string, err error, log JobLogFunc) { + if err != nil && name != "" { + log("Job `%s` errored with an error: `%s`", logging.ERROR, name, err.Error()) } } } -// DefaultJobPanicHandler is a default panic handler for a job -func DefaultJobPanicHandler(name string, recoverValue interface{}, logger *logging.Logger) { - if recoverValue != nil && name != "" { - message := fmt.Sprintf("Job `%s` panicked with value: `%#v`", name, recoverValue) - - if logger != nil { - logger.Error(message) - } else { - fmt.Print("[ERROR]", message) +// DefaultJobPanicHandler returns default panic handler for a job +func DefaultJobPanicHandler() JobPanicHandler { + return func(name string, recoverValue interface{}, log JobLogFunc) { + if recoverValue != nil && name != "" { + log("Job `%s` panicked with value: `%#v`", logging.ERROR, name, recoverValue) } } } @@ -91,124 +142,135 @@ func (j *JobManager) SetLogging(enableLogging bool) *JobManager { } // RegisterJob registers new job -func (j *JobManager) RegisterJob(name string, job Job) error { - if i, ok := j.jobs.Load(name); ok { - if _, ok := j.asJob(i); ok { - return errors.New("job already exists") - } +func (j *JobManager) RegisterJob(name string, job *Job) error { + if job == nil { + return errors.New("job shouldn't be nil") + } + + if _, ok := j.FetchJob(name); ok { + return errors.New("job already exists") } j.jobs.Store(name, job) + return nil } +// UnregisterJob unregisters job if it's exists. Returns error if job doesn't exist. +func (j *JobManager) UnregisterJob(name string) error { + if i, ok := j.FetchJob(name); ok { + i.stop() + j.jobs.Delete(name) + } + + return fmt.Errorf("cannot find job `%s`", name) +} + // FetchJob fetches already exist job -func (j *JobManager) FetchJob(name string) (value Job, ok bool) { +func (j *JobManager) FetchJob(name string) (value *Job, ok bool) { if i, ok := j.jobs.Load(name); ok { if job, ok := j.asJob(i); ok { return job, ok } } - return Job{}, false + return &Job{}, false } // UpdateJob updates job -func (j *JobManager) UpdateJob(name string, job Job) error { +func (j *JobManager) UpdateJob(name string, job *Job) error { if job, ok := j.FetchJob(name); ok { - j.jobs.Delete(name) + _ = j.UnregisterJob(name) return j.RegisterJob(name, job) } return fmt.Errorf("cannot find job `%s`", name) } -// ExecuteJob executes provided job if it's exists. It's async operation and error returns only of job wasn't executed at all. -func (j *JobManager) ExecuteJob(name string, resetInterval bool) error { +// RunJob starts provided regular job if it's exists. It's async operation and error returns only of job wasn't executed at all. +func (j *JobManager) RunJob(name string) error { if job, ok := j.FetchJob(name); ok { - return j.runJob(name, job, resetInterval) + job.run(name, j.log) + return nil } return fmt.Errorf("cannot find job `%s`", name) } -// StartExecutor runs executor -func (j *JobManager) StartExecutor(executorInterval time.Duration) error { - if executorInterval <= 0 { - return errors.New("executorInterval must be higher that 0") +// StopJob stops provided regular regular job if it's exists. +func (j *JobManager) StopJob(name string) error { + if job, ok := j.FetchJob(name); ok { + job.stop() + return nil } - if j.executorChannel != nil { - return errors.New("executor is already active") - } - - j.executorInterval = executorInterval - j.executorChannel = make(chan bool) - - go func(stop chan bool) { - for _ = range time.NewTicker(j.executorInterval).C { - select { - case <-stop: - return - case <-time.After(time.Second): - j.jobs.Range(func(key, value interface{}) bool { - if job, ok := j.asJob(value); ok { - if name, ok := key.(string); ok { - if job.Regular && - job.lastExecuted.Before(time.Now()) && - time.Since(job.lastExecuted) >= job.Interval { - if err := j.runJob(name, job, true); err != nil { - j.logError("error while executing job `%s`: %s", name, err.Error()) - } - } - } - } - - return true - }) - } - } - }(j.executorChannel) - - return nil + return fmt.Errorf("cannot find job `%s`", name) } -// logError logs error -func (j *JobManager) logError(format string, args ...interface{}) { - if j.logger != nil { - j.logger.Errorf(format, args...) +// RunJobOnce starts provided job once if it exists. It's also async. +func (j *JobManager) RunJobOnce(name string) error { + if job, ok := j.FetchJob(name); ok { + job.runOnce(name, j.log) + return nil } - fmt.Printf(format, args...) + return fmt.Errorf("cannot find job `%s`", name) +} + +// RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work. +func (j *JobManager) RunJobOnceSync(name string) error { + if job, ok := j.FetchJob(name); ok { + job.runOnceSync(name, j.log) + return nil + } + + return fmt.Errorf("cannot find job `%s`", name) +} + +// log logs via logger or as plaintext +func (j *JobManager) log(format string, severity logging.Level, args ...interface{}) { + if !j.enableLogging { + return + } + + if j.logger != nil { + switch severity { + case logging.CRITICAL: + j.logger.Criticalf(format, args...) + case logging.ERROR: + j.logger.Errorf(format, args...) + case logging.WARNING: + j.logger.Warningf(format, args...) + case logging.NOTICE: + j.logger.Noticef(format, args...) + case logging.INFO: + j.logger.Infof(format, args...) + case logging.DEBUG: + j.logger.Debugf(format, args...) + } + } + + switch severity { + case logging.CRITICAL: + fmt.Print("[CRITICAL] ", fmt.Sprintf(format, args...)) + case logging.ERROR: + fmt.Print("[ERROR] ", fmt.Sprintf(format, args...)) + case logging.WARNING: + fmt.Print("[WARNING] ", fmt.Sprintf(format, args...)) + case logging.NOTICE: + fmt.Print("[NOTICE] ", fmt.Sprintf(format, args...)) + case logging.INFO: + fmt.Print("[INFO] ", fmt.Sprintf(format, args...)) + case logging.DEBUG: + fmt.Print("[DEBUG] ", fmt.Sprintf(format, args...)) + } } // asJob casts interface to a Job -func (j *JobManager) asJob(v interface{}) (Job, bool) { - if job, ok := v.(Job); ok { +func (j *JobManager) asJob(v interface{}) (*Job, bool) { + if job, ok := v.(*Job); ok { return job, ok } - return Job{}, false -} - -// runJob executes provided job from object. It's async operation and error returns only of job wasn't executed at all. -func (j *JobManager) runJob(name string, job Job, resetInterval bool) error { - go func() { - defer func() { - if r := recover(); r != nil && job.PanicHandler != nil { - job.PanicHandler(name, r, j.logger) - } - }() - - if err := job.Command(); err != nil && job.ErrorHandler != nil { - job.ErrorHandler(name, err, j.logger) - } - }() - - if resetInterval { - job.lastExecuted = time.Now() - return j.UpdateJob(name, job) - } - - return fmt.Errorf("cannot find job `%s`", name) + return &Job{}, false } diff --git a/core/job_manager_test.go b/core/job_manager_test.go new file mode 100644 index 0000000..66cd429 --- /dev/null +++ b/core/job_manager_test.go @@ -0,0 +1,279 @@ +package core + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/op/go-logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type JobTest struct { + suite.Suite + job *Job + executed bool + executeErr error + lastLog string + lastMsgLevel logging.Level + panicValue interface{} +} + +type JobManagerTest struct { + suite.Suite + manager *JobManager + runnerFlag bool +} + +func TestJob(t *testing.T) { + suite.Run(t, new(JobTest)) +} + +func TestJobManager(t *testing.T) { + suite.Run(t, new(JobManagerTest)) +} + +func TestDefaultJobErrorHandler(t *testing.T) { + defer func() { + require.Nil(t, recover()) + }() + + fn := DefaultJobErrorHandler() + require.NotNil(t, fn) + fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) { + require.Len(t, i, 2) + assert.Equal(t, fmt.Sprintf("%s", i[1]), "test") + }) +} + +func TestDefaultJobPanicHandler(t *testing.T) { + defer func() { + require.Nil(t, recover()) + }() + + fn := DefaultJobPanicHandler() + require.NotNil(t, fn) + fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) { + require.Len(t, i, 2) + assert.Equal(t, fmt.Sprintf("%s", i[1]), "test") + }) +} + +func (t *JobTest) testErrorHandler() JobErrorHandler { + return func(name string, err error, logFunc JobLogFunc) { + t.executeErr = err + } +} + +func (t *JobTest) testPanicHandler() JobPanicHandler { + return func(name string, i interface{}, logFunc JobLogFunc) { + t.panicValue = i + } +} + +func (t *JobTest) testLogFunc() JobLogFunc { + return func(s string, level logging.Level, i ...interface{}) { + t.lastLog = fmt.Sprintf(s, i...) + t.lastMsgLevel = level + } +} + +func (t *JobTest) errored() bool { + return t.executeErr != nil +} + +func (t *JobTest) panicked() bool { + return t.panicValue != nil +} + +func (t *JobTest) clear() { + if t.job != nil { + t.job.stop() + t.job = nil + } + + t.executed = false + t.executeErr = nil + t.panicValue = nil +} + +func (t *JobTest) onceJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + return nil + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: 0, + Regular: false, + } +} + +func (t *JobTest) onceErrorJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + return errors.New("test error") + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: 0, + Regular: false, + } +} + +func (t *JobTest) oncePanicJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + panic("test panic") + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: 0, + Regular: false, + } +} + +func (t *JobTest) regularJob() { + t.clear() + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.executed = true + return nil + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: time.Nanosecond, + Regular: true, + } +} + +func (t *JobTest) Test_getWrappedFunc() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.onceJob() + fn := t.job.getWrappedFunc("job", t.testLogFunc()) + require.NotNil(t.T(), fn) + fn() + assert.True(t.T(), t.executed) + assert.False(t.T(), t.errored()) + assert.False(t.T(), t.panicked()) +} + +func (t *JobTest) Test_getWrappedFuncError() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.onceErrorJob() + fn := t.job.getWrappedFunc("job", t.testLogFunc()) + require.NotNil(t.T(), fn) + fn() + assert.True(t.T(), t.executed) + assert.True(t.T(), t.errored()) + assert.False(t.T(), t.panicked()) +} + +func (t *JobTest) Test_getWrappedFuncPanic() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.oncePanicJob() + fn := t.job.getWrappedFunc("job", t.testLogFunc()) + require.NotNil(t.T(), fn) + fn() + assert.True(t.T(), t.executed) + assert.False(t.T(), t.errored()) + assert.True(t.T(), t.panicked()) +} + +func (t *JobTest) Test_getWrappedTimerFunc() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.clear() + t.regularJob() + t.job.run("job", t.testLogFunc()) + time.Sleep(time.Millisecond) + assert.True(t.T(), t.executed) + t.executed = false + time.Sleep(time.Millisecond) + if !t.executed { + t.clear() + t.T().Skip("job wasn't as fast as it should be! this may be an error, but also can be just bad timing") + } + t.job.stop() + time.Sleep(time.Nanosecond * 10) + t.clear() + assert.False(t.T(), t.executed) +} + +func (t *JobManagerTest) SetupSuite() { + t.manager = NewJobManager() +} + +func (t *JobManagerTest) Test_SetLogger() { + t.manager.logger = nil + t.manager.SetLogger(NewLogger("test", logging.ERROR, DefaultLogFormatter())) + assert.IsType(t.T(), &logging.Logger{}, t.manager.logger) + + t.manager.SetLogger(nil) + assert.IsType(t.T(), &logging.Logger{}, t.manager.logger) +} + +func (t *JobManagerTest) Test_SetLogging() { + t.manager.enableLogging = false + t.manager.SetLogging(true) + assert.True(t.T(), t.manager.enableLogging) + + t.manager.SetLogging(false) + assert.False(t.T(), t.manager.enableLogging) +} + +func (t *JobManagerTest) Test_RegisterJobNil() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RegisterJob("job", nil) + assert.EqualError(t.T(), err, "job shouldn't be nil") +} + +func (t *JobManagerTest) Test_RegisterJob() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RegisterJob("job", &Job{ + Command: func(log JobLogFunc) error { + t.runnerFlag = true + return nil + }, + ErrorHandler: DefaultJobErrorHandler(), + PanicHandler: DefaultJobPanicHandler(), + }) + assert.NoError(t.T(), err) +} + +func (t *JobManagerTest) Test_RegisterJobAlreadyExists() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RegisterJob("job", &Job{}) + assert.EqualError(t.T(), err, "job already exists") +} + +func (t *JobManagerTest) Test_RunOnceSync() { + require.NotNil(t.T(), t.manager.jobs) + t.runnerFlag = false + err := t.manager.RunJobOnceSync("job") + require.NoError(t.T(), err) + assert.True(t.T(), t.runnerFlag) +} From 37d815ba34c19614e75214afcc0a4e62edb8118c Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 12:16:54 +0300 Subject: [PATCH 3/8] more tests, prevent test hangup --- .travis.yml | 2 +- core/job_manager.go | 11 +- core/job_manager_test.go | 235 ++++++++++++++++++++++++++++++++------- 3 files changed, 196 insertions(+), 52 deletions(-) diff --git a/.travis.yml b/.travis.yml index 12de6d3..4821ce2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,6 @@ go: before_install: - go mod tidy script: - - go test ./... -v -cpu 2 -race -cover -coverprofile=coverage.txt -covermode=atomic + - go test ./... -v -cpu 2 -timeout 1m -race -cover -coverprofile=coverage.txt -covermode=atomic after_success: - bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/core/job_manager.go b/core/job_manager.go index 0b60d18..6d8133f 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -169,7 +169,7 @@ func (j *JobManager) UnregisterJob(name string) error { // FetchJob fetches already exist job func (j *JobManager) FetchJob(name string) (value *Job, ok bool) { if i, ok := j.jobs.Load(name); ok { - if job, ok := j.asJob(i); ok { + if job, ok := i.(*Job); ok { return job, ok } } @@ -265,12 +265,3 @@ func (j *JobManager) log(format string, severity logging.Level, args ...interfac fmt.Print("[DEBUG] ", fmt.Sprintf(format, args...)) } } - -// asJob casts interface to a Job -func (j *JobManager) asJob(v interface{}) (*Job, bool) { - if job, ok := v.(*Job); ok { - return job, ok - } - - return &Job{}, false -} diff --git a/core/job_manager_test.go b/core/job_manager_test.go index 66cd429..b000989 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -3,6 +3,7 @@ package core import ( "errors" "fmt" + "math/rand" "testing" "time" @@ -15,11 +16,13 @@ import ( type JobTest struct { suite.Suite job *Job - executed bool - executeErr error + syncBool bool + executedChan chan bool + randomNumber chan int + executeErr chan error + panicValue chan interface{} lastLog string lastMsgLevel logging.Level - panicValue interface{} } type JobManagerTest struct { @@ -64,13 +67,13 @@ func TestDefaultJobPanicHandler(t *testing.T) { func (t *JobTest) testErrorHandler() JobErrorHandler { return func(name string, err error, logFunc JobLogFunc) { - t.executeErr = err + t.executeErr <- err } } func (t *JobTest) testPanicHandler() JobPanicHandler { return func(name string, i interface{}, logFunc JobLogFunc) { - t.panicValue = i + t.panicValue <- i } } @@ -81,12 +84,43 @@ func (t *JobTest) testLogFunc() JobLogFunc { } } -func (t *JobTest) errored() bool { - return t.executeErr != nil +func (t *JobTest) executed(wait time.Duration, defaultVal bool) bool { + if t.executedChan == nil { + return defaultVal + } + + select { + case c := <-t.executedChan: + return c + case <-time.After(wait): + return defaultVal + } } -func (t *JobTest) panicked() bool { - return t.panicValue != nil +func (t *JobTest) errored(wait time.Duration) bool { + if t.executeErr == nil { + return false + } + + select { + case c := <-t.executeErr: + return c != nil + case <-time.After(wait): + return false + } +} + +func (t *JobTest) panicked(wait time.Duration) bool { + if t.panicValue == nil { + return false + } + + select { + case c := <-t.panicValue: + return c != nil + case <-time.After(wait): + return false + } } func (t *JobTest) clear() { @@ -95,16 +129,18 @@ func (t *JobTest) clear() { t.job = nil } - t.executed = false - t.executeErr = nil - t.panicValue = nil + t.syncBool = false + t.randomNumber = make(chan int) + t.executedChan = make(chan bool) + t.executeErr = make(chan error) + t.panicValue = make(chan interface{}) } func (t *JobTest) onceJob() { t.clear() t.job = &Job{ Command: func(logFunc JobLogFunc) error { - t.executed = true + t.executedChan <- true return nil }, ErrorHandler: t.testErrorHandler(), @@ -118,7 +154,7 @@ func (t *JobTest) onceErrorJob() { t.clear() t.job = &Job{ Command: func(logFunc JobLogFunc) error { - t.executed = true + t.executedChan <- true return errors.New("test error") }, ErrorHandler: t.testErrorHandler(), @@ -132,7 +168,7 @@ func (t *JobTest) oncePanicJob() { t.clear() t.job = &Job{ Command: func(logFunc JobLogFunc) error { - t.executed = true + t.executedChan <- true panic("test panic") }, ErrorHandler: t.testErrorHandler(), @@ -144,14 +180,31 @@ func (t *JobTest) oncePanicJob() { func (t *JobTest) regularJob() { t.clear() + rand.Seed(time.Now().UnixNano()) t.job = &Job{ Command: func(logFunc JobLogFunc) error { - t.executed = true + t.executedChan <- true + t.randomNumber <- rand.Int() return nil }, ErrorHandler: t.testErrorHandler(), PanicHandler: t.testPanicHandler(), - Interval: time.Nanosecond, + Interval: time.Millisecond, + Regular: true, + } +} + +func (t *JobTest) regularSyncJob() { + t.clear() + rand.Seed(time.Now().UnixNano()) + t.job = &Job{ + Command: func(logFunc JobLogFunc) error { + t.syncBool = true + return nil + }, + ErrorHandler: t.testErrorHandler(), + PanicHandler: t.testPanicHandler(), + Interval: time.Millisecond, Regular: true, } } @@ -165,10 +218,10 @@ func (t *JobTest) Test_getWrappedFunc() { t.onceJob() fn := t.job.getWrappedFunc("job", t.testLogFunc()) require.NotNil(t.T(), fn) - fn() - assert.True(t.T(), t.executed) - assert.False(t.T(), t.errored()) - assert.False(t.T(), t.panicked()) + go fn() + assert.True(t.T(), t.executed(time.Millisecond, false)) + assert.False(t.T(), t.errored(time.Millisecond)) + assert.False(t.T(), t.panicked(time.Millisecond)) } func (t *JobTest) Test_getWrappedFuncError() { @@ -180,10 +233,10 @@ func (t *JobTest) Test_getWrappedFuncError() { t.onceErrorJob() fn := t.job.getWrappedFunc("job", t.testLogFunc()) require.NotNil(t.T(), fn) - fn() - assert.True(t.T(), t.executed) - assert.True(t.T(), t.errored()) - assert.False(t.T(), t.panicked()) + go fn() + assert.True(t.T(), t.executed(time.Millisecond, false)) + assert.True(t.T(), t.errored(time.Millisecond)) + assert.False(t.T(), t.panicked(time.Millisecond)) } func (t *JobTest) Test_getWrappedFuncPanic() { @@ -195,32 +248,91 @@ func (t *JobTest) Test_getWrappedFuncPanic() { t.oncePanicJob() fn := t.job.getWrappedFunc("job", t.testLogFunc()) require.NotNil(t.T(), fn) - fn() - assert.True(t.T(), t.executed) - assert.False(t.T(), t.errored()) - assert.True(t.T(), t.panicked()) + go fn() + assert.True(t.T(), t.executed(time.Millisecond, false)) + assert.False(t.T(), t.errored(time.Millisecond)) + assert.True(t.T(), t.panicked(time.Millisecond)) } -func (t *JobTest) Test_getWrappedTimerFunc() { +// func (t *JobTest) Test_getWrappedTimerFunc() { +// defer func() { +// require.Nil(t.T(), recover()) +// }() +// +// t.clear() +// t.regularJob() +// t.job.run("job", t.testLogFunc()) +// time.Sleep(time.Millisecond * 5) +// require.True(t.T(), t.executed(time.Millisecond, false)) +// first := 0 +// +// select { +// case c := <-t.randomNumber: +// first = c +// t.randomNumber = make(chan int) +// case <-time.After(time.Millisecond * 2): +// first = 0 +// } +// +// require.NotEqual(t.T(), 0, first) +// second := 0 +// +// select { +// case c := <-t.randomNumber: +// second = c +// t.randomNumber = make(chan int) +// case <-time.After(time.Millisecond * 2): +// second = 0 +// } +// +// require.NotEqual(t.T(), 0, second) +// assert.NotEqual(t.T(), first, second) +// } + +func (t *JobTest) Test_runOnce() { defer func() { require.Nil(t.T(), recover()) }() t.clear() t.regularJob() - t.job.run("job", t.testLogFunc()) - time.Sleep(time.Millisecond) - assert.True(t.T(), t.executed) - t.executed = false - time.Sleep(time.Millisecond) - if !t.executed { - t.clear() - t.T().Skip("job wasn't as fast as it should be! this may be an error, but also can be just bad timing") + t.job.runOnce("job", t.testLogFunc()) + time.Sleep(time.Millisecond * 5) + require.True(t.T(), t.executed(time.Millisecond, false)) + first := 0 + + select { + case c := <-t.randomNumber: + first = c + t.randomNumber = make(chan int) + case <-time.After(time.Millisecond * 2): + first = 0 } - t.job.stop() - time.Sleep(time.Nanosecond * 10) + + require.NotEqual(t.T(), 0, first) + second := 0 + + select { + case c := <-t.randomNumber: + second = c + t.randomNumber = make(chan int) + case <-time.After(time.Millisecond * 2): + second = 0 + } + + assert.Equal(t.T(), 0, second) +} + +func (t *JobTest) Test_runOnceSync() { + defer func() { + require.Nil(t.T(), recover()) + }() + t.clear() - assert.False(t.T(), t.executed) + t.regularSyncJob() + require.False(t.T(), t.syncBool) + t.job.runOnceSync("job", t.testLogFunc()) + assert.True(t.T(), t.syncBool) } func (t *JobManagerTest) SetupSuite() { @@ -270,6 +382,41 @@ func (t *JobManagerTest) Test_RegisterJobAlreadyExists() { assert.EqualError(t.T(), err, "job already exists") } +func (t *JobManagerTest) Test_FetchJobDoesntExist() { + require.NotNil(t.T(), t.manager.jobs) + _, ok := t.manager.FetchJob("doesn't exist") + assert.False(t.T(), ok) +} + +func (t *JobManagerTest) Test_FetchJob() { + defer func() { + require.Nil(t.T(), recover()) + }() + + require.NoError(t.T(), t.manager.RegisterJob("test_fetch", &Job{Command: func(logFunc JobLogFunc) error { + return nil + }})) + require.NotNil(t.T(), t.manager.jobs) + job, ok := t.manager.FetchJob("test_fetch") + assert.True(t.T(), ok) + require.NotNil(t.T(), job) + assert.NotNil(t.T(), job.Command) + _ = t.manager.UnregisterJob("test_fetch") +} + +func (t *JobManagerTest) Test_UpdateJobDoesntExist() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.UpdateJob("doesn't exist", &Job{}) + assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") +} + +func (t *JobManagerTest) Test_UpdateJob() { + require.NotNil(t.T(), t.manager.jobs) + job, _ := t.manager.FetchJob("job") + err := t.manager.UpdateJob("job", job) + assert.NoError(t.T(), err) +} + func (t *JobManagerTest) Test_RunOnceSync() { require.NotNil(t.T(), t.manager.jobs) t.runnerFlag = false @@ -277,3 +424,9 @@ func (t *JobManagerTest) Test_RunOnceSync() { require.NoError(t.T(), err) assert.True(t.T(), t.runnerFlag) } + +func (t *JobManagerTest) Test_UnregisterJobDoesntExist() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.UnregisterJob("doesn't exist") + assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") +} From 9801b4d47ff9f99e154bf5f41ec74875e80bcc28 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 13:50:30 +0300 Subject: [PATCH 4/8] fixes for tests --- .travis.yml | 2 +- core/job_manager.go | 14 +++-- core/job_manager_test.go | 126 +++++++++++++++++++++++++++++++++------ 3 files changed, 116 insertions(+), 26 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4821ce2..468d092 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,6 @@ go: before_install: - go mod tidy script: - - go test ./... -v -cpu 2 -timeout 1m -race -cover -coverprofile=coverage.txt -covermode=atomic + - go test ./... -v -cpu 2 -timeout 2m -race -cover -coverprofile=coverage.txt -covermode=atomic after_success: - bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/core/job_manager.go b/core/job_manager.go index 6d8133f..b489aad 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -57,11 +57,11 @@ func (j *Job) getWrappedFunc(name string, log JobLogFunc) func() { } // getWrappedTimerFunc returns job timer func to run in the separate goroutine -func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func() { - return func() { +func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func(chan bool) { + return func(stopChannel chan bool) { for range time.NewTicker(j.Interval).C { select { - case <-j.stopChannel: + case <-stopChannel: return default: j.getWrappedFunc(name, log)() @@ -74,7 +74,7 @@ func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func() { func (j *Job) run(name string, log JobLogFunc) *Job { if j.Regular && j.Interval > 0 && !j.active { j.stopChannel = make(chan bool) - go j.getWrappedTimerFunc(name, log)() + go j.getWrappedTimerFunc(name, log)(j.stopChannel) j.active = true } @@ -84,8 +84,10 @@ func (j *Job) run(name string, log JobLogFunc) *Job { // stop running job func (j *Job) stop() *Job { if j.active && j.stopChannel != nil { - j.stopChannel <- true - j.active = false + go func() { + j.stopChannel <- true + j.active = false + }() } return j diff --git a/core/job_manager_test.go b/core/job_manager_test.go index b000989..be3d199 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -27,8 +27,9 @@ type JobTest struct { type JobManagerTest struct { suite.Suite - manager *JobManager - runnerFlag bool + manager *JobManager + runnerFlag chan bool + syncRunnerFlag bool } func TestJob(t *testing.T) { @@ -137,7 +138,6 @@ func (t *JobTest) clear() { } func (t *JobTest) onceJob() { - t.clear() t.job = &Job{ Command: func(logFunc JobLogFunc) error { t.executedChan <- true @@ -151,7 +151,6 @@ func (t *JobTest) onceJob() { } func (t *JobTest) onceErrorJob() { - t.clear() t.job = &Job{ Command: func(logFunc JobLogFunc) error { t.executedChan <- true @@ -165,7 +164,6 @@ func (t *JobTest) onceErrorJob() { } func (t *JobTest) oncePanicJob() { - t.clear() t.job = &Job{ Command: func(logFunc JobLogFunc) error { t.executedChan <- true @@ -179,7 +177,6 @@ func (t *JobTest) oncePanicJob() { } func (t *JobTest) regularJob() { - t.clear() rand.Seed(time.Now().UnixNano()) t.job = &Job{ Command: func(logFunc JobLogFunc) error { @@ -195,7 +192,6 @@ func (t *JobTest) regularJob() { } func (t *JobTest) regularSyncJob() { - t.clear() rand.Seed(time.Now().UnixNano()) t.job = &Job{ Command: func(logFunc JobLogFunc) error { @@ -259,7 +255,6 @@ func (t *JobTest) Test_getWrappedFuncPanic() { // require.Nil(t.T(), recover()) // }() // -// t.clear() // t.regularJob() // t.job.run("job", t.testLogFunc()) // time.Sleep(time.Millisecond * 5) @@ -289,12 +284,23 @@ func (t *JobTest) Test_getWrappedFuncPanic() { // assert.NotEqual(t.T(), first, second) // } +func (t *JobTest) Test_run() { + defer func() { + require.Nil(t.T(), recover()) + }() + + t.regularJob() + t.job.run("job", t.testLogFunc()) + time.Sleep(time.Millisecond * 5) + t.job.stop() + require.True(t.T(), t.executed(time.Millisecond, false)) +} + func (t *JobTest) Test_runOnce() { defer func() { require.Nil(t.T(), recover()) }() - t.clear() t.regularJob() t.job.runOnce("job", t.testLogFunc()) time.Sleep(time.Millisecond * 5) @@ -304,23 +310,20 @@ func (t *JobTest) Test_runOnce() { select { case c := <-t.randomNumber: first = c - t.randomNumber = make(chan int) case <-time.After(time.Millisecond * 2): first = 0 } - require.NotEqual(t.T(), 0, first) second := 0 select { case c := <-t.randomNumber: second = c - t.randomNumber = make(chan int) case <-time.After(time.Millisecond * 2): second = 0 } - assert.Equal(t.T(), 0, second) + assert.NotEqual(t.T(), first, second) } func (t *JobTest) Test_runOnceSync() { @@ -339,6 +342,19 @@ func (t *JobManagerTest) SetupSuite() { t.manager = NewJobManager() } +func (t *JobManagerTest) ranFlag() bool { + if t.runnerFlag == nil { + return false + } + + select { + case c := <-t.runnerFlag: + return c + case <-time.After(time.Millisecond): + return false + } +} + func (t *JobManagerTest) Test_SetLogger() { t.manager.logger = nil t.manager.SetLogger(NewLogger("test", logging.ERROR, DefaultLogFormatter())) @@ -367,13 +383,24 @@ func (t *JobManagerTest) Test_RegisterJob() { require.NotNil(t.T(), t.manager.jobs) err := t.manager.RegisterJob("job", &Job{ Command: func(log JobLogFunc) error { - t.runnerFlag = true + t.runnerFlag <- true return nil }, ErrorHandler: DefaultJobErrorHandler(), PanicHandler: DefaultJobPanicHandler(), }) assert.NoError(t.T(), err) + err = t.manager.RegisterJob("job_regular", &Job{ + Command: func(log JobLogFunc) error { + t.runnerFlag <- true + return nil + }, + ErrorHandler: DefaultJobErrorHandler(), + PanicHandler: DefaultJobPanicHandler(), + Regular: true, + Interval: time.Millisecond, + }) + assert.NoError(t.T(), err) } func (t *JobManagerTest) Test_RegisterJobAlreadyExists() { @@ -417,16 +444,77 @@ func (t *JobManagerTest) Test_UpdateJob() { assert.NoError(t.T(), err) } -func (t *JobManagerTest) Test_RunOnceSync() { +func (t *JobManagerTest) Test_RunJobDoesntExist() { require.NotNil(t.T(), t.manager.jobs) - t.runnerFlag = false - err := t.manager.RunJobOnceSync("job") - require.NoError(t.T(), err) - assert.True(t.T(), t.runnerFlag) + err := t.manager.RunJob("doesn't exist") + assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") } +func (t *JobManagerTest) Test_RunJob() { + require.NotNil(t.T(), t.manager.jobs) + t.runnerFlag = make(chan bool) + err := t.manager.RunJob("job_regular") + require.NoError(t.T(), err) + time.Sleep(time.Millisecond * 5) + assert.True(t.T(), <-t.runnerFlag) + err = t.manager.StopJob("job_regular") + require.NoError(t.T(), err) +} + +func (t *JobManagerTest) Test_RunJobOnceDoesntExist() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RunJobOnce("doesn't exist") + assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") +} + +// func (t *JobManagerTest) Test_RunJobOnce() { +// require.NotNil(t.T(), t.manager.jobs) +// t.runnerFlag = make(chan bool) +// err := t.manager.RunJobOnce("job") +// require.NoError(t.T(), err) +// assert.True(t.T(), t.ranFlag()) +// } + +func (t *JobManagerTest) Test_RunJobOnceSyncDoesntExist() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RunJobOnceSync("doesn't exist") + assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") +} + +// func (t *JobManagerTest) Test_RunJobOnceSync() { +// require.NotNil(t.T(), t.manager.jobs) +// t.runnerFlag = make(chan bool) +// err := t.manager.RunJobOnceSync("job") +// require.NoError(t.T(), err) +// go func() { +// assert.True(t.T(), t.ranFlag()) +// }() +// } + func (t *JobManagerTest) Test_UnregisterJobDoesntExist() { require.NotNil(t.T(), t.manager.jobs) err := t.manager.UnregisterJob("doesn't exist") assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") } + +func (t *JobManagerTest) Test_log() { + defer func() { + require.Nil(t.T(), recover()) + }() + + testLog := func() { + t.manager.log("test", logging.CRITICAL) + t.manager.log("test", logging.ERROR) + t.manager.log("test", logging.WARNING) + t.manager.log("test", logging.NOTICE) + t.manager.log("test", logging.INFO) + t.manager.log("test", logging.DEBUG) + } + t.manager.SetLogging(false) + testLog() + t.manager.SetLogging(true) + t.manager.logger = nil + testLog() + t.manager.logger = NewLogger("test", logging.DEBUG, DefaultLogFormatter()) + testLog() +} From f395f06363cafa087e55c6d419cd3eaf1989d8a1 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 13:57:39 +0300 Subject: [PATCH 5/8] more tests --- core/job_manager_test.go | 72 ++++++++++++---------------------------- 1 file changed, 22 insertions(+), 50 deletions(-) diff --git a/core/job_manager_test.go b/core/job_manager_test.go index be3d199..14f0d5d 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -250,40 +250,6 @@ func (t *JobTest) Test_getWrappedFuncPanic() { assert.True(t.T(), t.panicked(time.Millisecond)) } -// func (t *JobTest) Test_getWrappedTimerFunc() { -// defer func() { -// require.Nil(t.T(), recover()) -// }() -// -// t.regularJob() -// t.job.run("job", t.testLogFunc()) -// time.Sleep(time.Millisecond * 5) -// require.True(t.T(), t.executed(time.Millisecond, false)) -// first := 0 -// -// select { -// case c := <-t.randomNumber: -// first = c -// t.randomNumber = make(chan int) -// case <-time.After(time.Millisecond * 2): -// first = 0 -// } -// -// require.NotEqual(t.T(), 0, first) -// second := 0 -// -// select { -// case c := <-t.randomNumber: -// second = c -// t.randomNumber = make(chan int) -// case <-time.After(time.Millisecond * 2): -// second = 0 -// } -// -// require.NotEqual(t.T(), 0, second) -// assert.NotEqual(t.T(), first, second) -// } - func (t *JobTest) Test_run() { defer func() { require.Nil(t.T(), recover()) @@ -401,6 +367,15 @@ func (t *JobManagerTest) Test_RegisterJob() { Interval: time.Millisecond, }) assert.NoError(t.T(), err) + err = t.manager.RegisterJob("job_sync", &Job{ + Command: func(log JobLogFunc) error { + t.syncRunnerFlag = true + return nil + }, + ErrorHandler: DefaultJobErrorHandler(), + PanicHandler: DefaultJobPanicHandler(), + }) + assert.NoError(t.T(), err) } func (t *JobManagerTest) Test_RegisterJobAlreadyExists() { @@ -467,13 +442,13 @@ func (t *JobManagerTest) Test_RunJobOnceDoesntExist() { assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") } -// func (t *JobManagerTest) Test_RunJobOnce() { -// require.NotNil(t.T(), t.manager.jobs) -// t.runnerFlag = make(chan bool) -// err := t.manager.RunJobOnce("job") -// require.NoError(t.T(), err) -// assert.True(t.T(), t.ranFlag()) -// } +func (t *JobManagerTest) Test_RunJobOnce() { + require.NotNil(t.T(), t.manager.jobs) + go func() { t.runnerFlag <- false }() + err := t.manager.RunJobOnce("job") + require.NoError(t.T(), err) + assert.True(t.T(), t.ranFlag()) +} func (t *JobManagerTest) Test_RunJobOnceSyncDoesntExist() { require.NotNil(t.T(), t.manager.jobs) @@ -481,15 +456,12 @@ func (t *JobManagerTest) Test_RunJobOnceSyncDoesntExist() { assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") } -// func (t *JobManagerTest) Test_RunJobOnceSync() { -// require.NotNil(t.T(), t.manager.jobs) -// t.runnerFlag = make(chan bool) -// err := t.manager.RunJobOnceSync("job") -// require.NoError(t.T(), err) -// go func() { -// assert.True(t.T(), t.ranFlag()) -// }() -// } +func (t *JobManagerTest) Test_RunJobOnceSync() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.RunJobOnceSync("job_sync") + require.NoError(t.T(), err) + assert.True(t.T(), t.syncRunnerFlag) +} func (t *JobManagerTest) Test_UnregisterJobDoesntExist() { require.NotNil(t.T(), t.manager.jobs) From c3d24f3298f7990522beaaa9f44e0f21f8a7ca57 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 14:02:05 +0300 Subject: [PATCH 6/8] additional test for JobManager --- core/job_manager_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/job_manager_test.go b/core/job_manager_test.go index 14f0d5d..5e626c1 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -419,6 +419,12 @@ func (t *JobManagerTest) Test_UpdateJob() { assert.NoError(t.T(), err) } +func (t *JobManagerTest) Test_StopJobDoesntExist() { + require.NotNil(t.T(), t.manager.jobs) + err := t.manager.StopJob("doesn't exist") + assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") +} + func (t *JobManagerTest) Test_RunJobDoesntExist() { require.NotNil(t.T(), t.manager.jobs) err := t.manager.RunJob("doesn't exist") From d7bad94495c3653e558d4839bce76702dc1cb146 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 14:11:29 +0300 Subject: [PATCH 7/8] method to start all regular jobs in manager --- core/job_manager.go | 26 +++++++++++++++++++++++++- core/job_manager_test.go | 17 +++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/core/job_manager.go b/core/job_manager.go index b489aad..c30d067 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -34,7 +34,21 @@ type Job struct { // JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as // singleton), or jobs can be executed as regular jobs. Example initialization: -// TODO example initialization +// manager := NewJobManager(). +// SetLogger(logger). +// SetLogging(false) +// _ = manager.RegisterJob("updateTokens", &Job{ +// Command: func(logFunc JobLogFunc) error { +// // logic goes here... +// logFunc("All tokens were updated successfully", logging.INFO) +// return nil +// }, +// ErrorHandler: DefaultJobErrorHandler(), +// PanicHandler: DefaultJobPanicHandler(), +// Interval: time.Hour * 3, +// Regular: true, +// }) +// manager.Start() type JobManager struct { jobs *sync.Map enableLogging bool @@ -229,6 +243,16 @@ func (j *JobManager) RunJobOnceSync(name string) error { return fmt.Errorf("cannot find job `%s`", name) } +// Start all jobs in the manager +func (j *JobManager) Start() { + j.jobs.Range(func(key, value interface{}) bool { + name := key.(string) + job := value.(*Job) + job.run(name, j.log) + return true + }) +} + // log logs via logger or as plaintext func (j *JobManager) log(format string, severity logging.Level, args ...interface{}) { if !j.enableLogging { diff --git a/core/job_manager_test.go b/core/job_manager_test.go index 5e626c1..fc994dc 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -475,6 +475,23 @@ func (t *JobManagerTest) Test_UnregisterJobDoesntExist() { assert.EqualError(t.T(), err, "cannot find job `doesn't exist`") } +func (t *JobManagerTest) Test_Start() { + defer func() { + require.Nil(t.T(), recover()) + }() + + manager := NewJobManager() + _ = manager.RegisterJob("job", &Job{ + Command: func(logFunc JobLogFunc) error { + logFunc("alive!", logging.INFO) + return nil + }, + ErrorHandler: DefaultJobErrorHandler(), + PanicHandler: DefaultJobPanicHandler(), + }) + manager.Start() +} + func (t *JobManagerTest) Test_log() { defer func() { require.Nil(t.T(), recover()) From cd31e6ea9e9ce05617ee3a547d6a6b39b27d7a2f Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 17 Dec 2019 14:59:53 +0300 Subject: [PATCH 8/8] prevent duplicating log items --- core/job_manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/job_manager.go b/core/job_manager.go index c30d067..76bd8d1 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -274,6 +274,8 @@ func (j *JobManager) log(format string, severity logging.Level, args ...interfac case logging.DEBUG: j.logger.Debugf(format, args...) } + + return } switch severity {