mirror of
https://github.com/retailcrm/mg-transport-core.git
synced 2024-11-29 00:25:31 +03:00
commit
a0d6a13e77
@ -7,6 +7,6 @@ go:
|
|||||||
before_install:
|
before_install:
|
||||||
- go mod tidy
|
- go mod tidy
|
||||||
script:
|
script:
|
||||||
- go test ./... -v -cpu 2 -race -cover -coverprofile=coverage.txt -covermode=atomic
|
- go test ./... -v -cpu 2 -timeout 2m -race -cover -coverprofile=coverage.txt -covermode=atomic
|
||||||
after_success:
|
after_success:
|
||||||
- bash <(curl -s https://codecov.io/bash)
|
- bash <(curl -s https://codecov.io/bash)
|
@ -21,6 +21,7 @@ type Engine struct {
|
|||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
Logger *logging.Logger
|
Logger *logging.Logger
|
||||||
csrf *CSRF
|
csrf *CSRF
|
||||||
|
jobManager *JobManager
|
||||||
Sessions sessions.Store
|
Sessions sessions.Store
|
||||||
Config ConfigInterface
|
Config ConfigInterface
|
||||||
LogFormatter logging.Formatter
|
LogFormatter logging.Formatter
|
||||||
@ -134,6 +135,15 @@ func (e *Engine) Router() *gin.Engine {
|
|||||||
return e.ginEngine
|
return e.ginEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// JobManager will return singleton JobManager from Engine
|
||||||
|
func (e *Engine) JobManager() *JobManager {
|
||||||
|
if e.jobManager == nil {
|
||||||
|
e.jobManager = NewJobManager().SetLogger(e.Logger).SetLogging(e.Config.IsDebug())
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.jobManager
|
||||||
|
}
|
||||||
|
|
||||||
// BuildHTTPClient builds HTTP client with provided configuration
|
// BuildHTTPClient builds HTTP client with provided configuration
|
||||||
func (e *Engine) BuildHTTPClient(replaceDefault ...bool) *Engine {
|
func (e *Engine) BuildHTTPClient(replaceDefault ...bool) *Engine {
|
||||||
if e.Config.GetHTTPClientConfig() != nil {
|
if e.Config.GetHTTPClientConfig() != nil {
|
||||||
|
@ -142,6 +142,17 @@ func (e *EngineTest) Test_Router() {
|
|||||||
assert.NotNil(e.T(), e.engine.Router())
|
assert.NotNil(e.T(), e.engine.Router())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *EngineTest) Test_JobManager() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(e.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.Nil(e.T(), e.engine.jobManager)
|
||||||
|
manager := e.engine.JobManager()
|
||||||
|
require.NotNil(e.T(), manager)
|
||||||
|
assert.Equal(e.T(), manager, e.engine.JobManager())
|
||||||
|
}
|
||||||
|
|
||||||
func (e *EngineTest) Test_ConfigureRouter() {
|
func (e *EngineTest) Test_ConfigureRouter() {
|
||||||
e.engine.TranslationsPath = testTranslationsDir
|
e.engine.TranslationsPath = testTranslationsDir
|
||||||
e.engine.Prepare()
|
e.engine.Prepare()
|
||||||
|
295
core/job_manager.go
Normal file
295
core/job_manager.go
Normal file
@ -0,0 +1,295 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/op/go-logging"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JobFunc is empty func which should be executed in a parallel goroutine
|
||||||
|
type JobFunc func(JobLogFunc) error
|
||||||
|
|
||||||
|
// JobLogFunc is a function which logs data from job
|
||||||
|
type JobLogFunc func(string, logging.Level, ...interface{})
|
||||||
|
|
||||||
|
// JobErrorHandler is a function to handle jobs errors. First argument is a job name.
|
||||||
|
type JobErrorHandler func(string, error, JobLogFunc)
|
||||||
|
|
||||||
|
// JobPanicHandler is a function to handle jobs panics. First argument is a job name.
|
||||||
|
type JobPanicHandler func(string, interface{}, JobLogFunc)
|
||||||
|
|
||||||
|
// Job represents single job. Regular job will be executed every Interval.
|
||||||
|
type Job struct {
|
||||||
|
Command JobFunc
|
||||||
|
ErrorHandler JobErrorHandler
|
||||||
|
PanicHandler JobPanicHandler
|
||||||
|
Interval time.Duration
|
||||||
|
Regular bool
|
||||||
|
active bool
|
||||||
|
stopChannel chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as
|
||||||
|
// singleton), or jobs can be executed as regular jobs. Example initialization:
|
||||||
|
// manager := NewJobManager().
|
||||||
|
// SetLogger(logger).
|
||||||
|
// SetLogging(false)
|
||||||
|
// _ = manager.RegisterJob("updateTokens", &Job{
|
||||||
|
// Command: func(logFunc JobLogFunc) error {
|
||||||
|
// // logic goes here...
|
||||||
|
// logFunc("All tokens were updated successfully", logging.INFO)
|
||||||
|
// return nil
|
||||||
|
// },
|
||||||
|
// ErrorHandler: DefaultJobErrorHandler(),
|
||||||
|
// PanicHandler: DefaultJobPanicHandler(),
|
||||||
|
// Interval: time.Hour * 3,
|
||||||
|
// Regular: true,
|
||||||
|
// })
|
||||||
|
// manager.Start()
|
||||||
|
type JobManager struct {
|
||||||
|
jobs *sync.Map
|
||||||
|
enableLogging bool
|
||||||
|
logger *logging.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// getWrappedFunc wraps job into function
|
||||||
|
func (j *Job) getWrappedFunc(name string, log JobLogFunc) func() {
|
||||||
|
return func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil && j.PanicHandler != nil {
|
||||||
|
j.PanicHandler(name, r, log)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := j.Command(log); err != nil && j.ErrorHandler != nil {
|
||||||
|
j.ErrorHandler(name, err, log)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getWrappedTimerFunc returns job timer func to run in the separate goroutine
|
||||||
|
func (j *Job) getWrappedTimerFunc(name string, log JobLogFunc) func(chan bool) {
|
||||||
|
return func(stopChannel chan bool) {
|
||||||
|
for range time.NewTicker(j.Interval).C {
|
||||||
|
select {
|
||||||
|
case <-stopChannel:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
j.getWrappedFunc(name, log)()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run job
|
||||||
|
func (j *Job) run(name string, log JobLogFunc) *Job {
|
||||||
|
if j.Regular && j.Interval > 0 && !j.active {
|
||||||
|
j.stopChannel = make(chan bool)
|
||||||
|
go j.getWrappedTimerFunc(name, log)(j.stopChannel)
|
||||||
|
j.active = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop running job
|
||||||
|
func (j *Job) stop() *Job {
|
||||||
|
if j.active && j.stopChannel != nil {
|
||||||
|
go func() {
|
||||||
|
j.stopChannel <- true
|
||||||
|
j.active = false
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// runOnce run job once
|
||||||
|
func (j *Job) runOnce(name string, log JobLogFunc) *Job {
|
||||||
|
go j.getWrappedFunc(name, log)()
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// runOnceSync run job once in current goroutine
|
||||||
|
func (j *Job) runOnceSync(name string, log JobLogFunc) *Job {
|
||||||
|
j.getWrappedFunc(name, log)()
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJobManager is a JobManager constructor
|
||||||
|
func NewJobManager() *JobManager {
|
||||||
|
return &JobManager{jobs: &sync.Map{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultJobErrorHandler returns default error handler for a job
|
||||||
|
func DefaultJobErrorHandler() JobErrorHandler {
|
||||||
|
return func(name string, err error, log JobLogFunc) {
|
||||||
|
if err != nil && name != "" {
|
||||||
|
log("Job `%s` errored with an error: `%s`", logging.ERROR, name, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultJobPanicHandler returns default panic handler for a job
|
||||||
|
func DefaultJobPanicHandler() JobPanicHandler {
|
||||||
|
return func(name string, recoverValue interface{}, log JobLogFunc) {
|
||||||
|
if recoverValue != nil && name != "" {
|
||||||
|
log("Job `%s` panicked with value: `%#v`", logging.ERROR, name, recoverValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLogger sets logger into JobManager
|
||||||
|
func (j *JobManager) SetLogger(logger *logging.Logger) *JobManager {
|
||||||
|
if logger != nil {
|
||||||
|
j.logger = logger
|
||||||
|
}
|
||||||
|
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLogging enables or disables JobManager logging
|
||||||
|
func (j *JobManager) SetLogging(enableLogging bool) *JobManager {
|
||||||
|
j.enableLogging = enableLogging
|
||||||
|
return j
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterJob registers new job
|
||||||
|
func (j *JobManager) RegisterJob(name string, job *Job) error {
|
||||||
|
if job == nil {
|
||||||
|
return errors.New("job shouldn't be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := j.FetchJob(name); ok {
|
||||||
|
return errors.New("job already exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
j.jobs.Store(name, job)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnregisterJob unregisters job if it's exists. Returns error if job doesn't exist.
|
||||||
|
func (j *JobManager) UnregisterJob(name string) error {
|
||||||
|
if i, ok := j.FetchJob(name); ok {
|
||||||
|
i.stop()
|
||||||
|
j.jobs.Delete(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot find job `%s`", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchJob fetches already exist job
|
||||||
|
func (j *JobManager) FetchJob(name string) (value *Job, ok bool) {
|
||||||
|
if i, ok := j.jobs.Load(name); ok {
|
||||||
|
if job, ok := i.(*Job); ok {
|
||||||
|
return job, ok
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Job{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateJob updates job
|
||||||
|
func (j *JobManager) UpdateJob(name string, job *Job) error {
|
||||||
|
if job, ok := j.FetchJob(name); ok {
|
||||||
|
_ = j.UnregisterJob(name)
|
||||||
|
return j.RegisterJob(name, job)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot find job `%s`", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunJob starts provided regular job if it's exists. It's async operation and error returns only of job wasn't executed at all.
|
||||||
|
func (j *JobManager) RunJob(name string) error {
|
||||||
|
if job, ok := j.FetchJob(name); ok {
|
||||||
|
job.run(name, j.log)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot find job `%s`", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopJob stops provided regular regular job if it's exists.
|
||||||
|
func (j *JobManager) StopJob(name string) error {
|
||||||
|
if job, ok := j.FetchJob(name); ok {
|
||||||
|
job.stop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot find job `%s`", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunJobOnce starts provided job once if it exists. It's also async.
|
||||||
|
func (j *JobManager) RunJobOnce(name string) error {
|
||||||
|
if job, ok := j.FetchJob(name); ok {
|
||||||
|
job.runOnce(name, j.log)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot find job `%s`", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work.
|
||||||
|
func (j *JobManager) RunJobOnceSync(name string) error {
|
||||||
|
if job, ok := j.FetchJob(name); ok {
|
||||||
|
job.runOnceSync(name, j.log)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot find job `%s`", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start all jobs in the manager
|
||||||
|
func (j *JobManager) Start() {
|
||||||
|
j.jobs.Range(func(key, value interface{}) bool {
|
||||||
|
name := key.(string)
|
||||||
|
job := value.(*Job)
|
||||||
|
job.run(name, j.log)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// log logs via logger or as plaintext
|
||||||
|
func (j *JobManager) log(format string, severity logging.Level, args ...interface{}) {
|
||||||
|
if !j.enableLogging {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if j.logger != nil {
|
||||||
|
switch severity {
|
||||||
|
case logging.CRITICAL:
|
||||||
|
j.logger.Criticalf(format, args...)
|
||||||
|
case logging.ERROR:
|
||||||
|
j.logger.Errorf(format, args...)
|
||||||
|
case logging.WARNING:
|
||||||
|
j.logger.Warningf(format, args...)
|
||||||
|
case logging.NOTICE:
|
||||||
|
j.logger.Noticef(format, args...)
|
||||||
|
case logging.INFO:
|
||||||
|
j.logger.Infof(format, args...)
|
||||||
|
case logging.DEBUG:
|
||||||
|
j.logger.Debugf(format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch severity {
|
||||||
|
case logging.CRITICAL:
|
||||||
|
fmt.Print("[CRITICAL] ", fmt.Sprintf(format, args...))
|
||||||
|
case logging.ERROR:
|
||||||
|
fmt.Print("[ERROR] ", fmt.Sprintf(format, args...))
|
||||||
|
case logging.WARNING:
|
||||||
|
fmt.Print("[WARNING] ", fmt.Sprintf(format, args...))
|
||||||
|
case logging.NOTICE:
|
||||||
|
fmt.Print("[NOTICE] ", fmt.Sprintf(format, args...))
|
||||||
|
case logging.INFO:
|
||||||
|
fmt.Print("[INFO] ", fmt.Sprintf(format, args...))
|
||||||
|
case logging.DEBUG:
|
||||||
|
fmt.Print("[DEBUG] ", fmt.Sprintf(format, args...))
|
||||||
|
}
|
||||||
|
}
|
515
core/job_manager_test.go
Normal file
515
core/job_manager_test.go
Normal file
@ -0,0 +1,515 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/op/go-logging"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobTest struct {
|
||||||
|
suite.Suite
|
||||||
|
job *Job
|
||||||
|
syncBool bool
|
||||||
|
executedChan chan bool
|
||||||
|
randomNumber chan int
|
||||||
|
executeErr chan error
|
||||||
|
panicValue chan interface{}
|
||||||
|
lastLog string
|
||||||
|
lastMsgLevel logging.Level
|
||||||
|
}
|
||||||
|
|
||||||
|
type JobManagerTest struct {
|
||||||
|
suite.Suite
|
||||||
|
manager *JobManager
|
||||||
|
runnerFlag chan bool
|
||||||
|
syncRunnerFlag bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJob(t *testing.T) {
|
||||||
|
suite.Run(t, new(JobTest))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobManager(t *testing.T) {
|
||||||
|
suite.Run(t, new(JobManagerTest))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDefaultJobErrorHandler(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t, recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
fn := DefaultJobErrorHandler()
|
||||||
|
require.NotNil(t, fn)
|
||||||
|
fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) {
|
||||||
|
require.Len(t, i, 2)
|
||||||
|
assert.Equal(t, fmt.Sprintf("%s", i[1]), "test")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDefaultJobPanicHandler(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t, recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
fn := DefaultJobPanicHandler()
|
||||||
|
require.NotNil(t, fn)
|
||||||
|
fn("job", errors.New("test"), func(s string, level logging.Level, i ...interface{}) {
|
||||||
|
require.Len(t, i, 2)
|
||||||
|
assert.Equal(t, fmt.Sprintf("%s", i[1]), "test")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) testErrorHandler() JobErrorHandler {
|
||||||
|
return func(name string, err error, logFunc JobLogFunc) {
|
||||||
|
t.executeErr <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) testPanicHandler() JobPanicHandler {
|
||||||
|
return func(name string, i interface{}, logFunc JobLogFunc) {
|
||||||
|
t.panicValue <- i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) testLogFunc() JobLogFunc {
|
||||||
|
return func(s string, level logging.Level, i ...interface{}) {
|
||||||
|
t.lastLog = fmt.Sprintf(s, i...)
|
||||||
|
t.lastMsgLevel = level
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) executed(wait time.Duration, defaultVal bool) bool {
|
||||||
|
if t.executedChan == nil {
|
||||||
|
return defaultVal
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-t.executedChan:
|
||||||
|
return c
|
||||||
|
case <-time.After(wait):
|
||||||
|
return defaultVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) errored(wait time.Duration) bool {
|
||||||
|
if t.executeErr == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-t.executeErr:
|
||||||
|
return c != nil
|
||||||
|
case <-time.After(wait):
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) panicked(wait time.Duration) bool {
|
||||||
|
if t.panicValue == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-t.panicValue:
|
||||||
|
return c != nil
|
||||||
|
case <-time.After(wait):
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) clear() {
|
||||||
|
if t.job != nil {
|
||||||
|
t.job.stop()
|
||||||
|
t.job = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
t.syncBool = false
|
||||||
|
t.randomNumber = make(chan int)
|
||||||
|
t.executedChan = make(chan bool)
|
||||||
|
t.executeErr = make(chan error)
|
||||||
|
t.panicValue = make(chan interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) onceJob() {
|
||||||
|
t.job = &Job{
|
||||||
|
Command: func(logFunc JobLogFunc) error {
|
||||||
|
t.executedChan <- true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: t.testErrorHandler(),
|
||||||
|
PanicHandler: t.testPanicHandler(),
|
||||||
|
Interval: 0,
|
||||||
|
Regular: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) onceErrorJob() {
|
||||||
|
t.job = &Job{
|
||||||
|
Command: func(logFunc JobLogFunc) error {
|
||||||
|
t.executedChan <- true
|
||||||
|
return errors.New("test error")
|
||||||
|
},
|
||||||
|
ErrorHandler: t.testErrorHandler(),
|
||||||
|
PanicHandler: t.testPanicHandler(),
|
||||||
|
Interval: 0,
|
||||||
|
Regular: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) oncePanicJob() {
|
||||||
|
t.job = &Job{
|
||||||
|
Command: func(logFunc JobLogFunc) error {
|
||||||
|
t.executedChan <- true
|
||||||
|
panic("test panic")
|
||||||
|
},
|
||||||
|
ErrorHandler: t.testErrorHandler(),
|
||||||
|
PanicHandler: t.testPanicHandler(),
|
||||||
|
Interval: 0,
|
||||||
|
Regular: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) regularJob() {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
t.job = &Job{
|
||||||
|
Command: func(logFunc JobLogFunc) error {
|
||||||
|
t.executedChan <- true
|
||||||
|
t.randomNumber <- rand.Int()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: t.testErrorHandler(),
|
||||||
|
PanicHandler: t.testPanicHandler(),
|
||||||
|
Interval: time.Millisecond,
|
||||||
|
Regular: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) regularSyncJob() {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
t.job = &Job{
|
||||||
|
Command: func(logFunc JobLogFunc) error {
|
||||||
|
t.syncBool = true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: t.testErrorHandler(),
|
||||||
|
PanicHandler: t.testPanicHandler(),
|
||||||
|
Interval: time.Millisecond,
|
||||||
|
Regular: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) Test_getWrappedFunc() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.clear()
|
||||||
|
t.onceJob()
|
||||||
|
fn := t.job.getWrappedFunc("job", t.testLogFunc())
|
||||||
|
require.NotNil(t.T(), fn)
|
||||||
|
go fn()
|
||||||
|
assert.True(t.T(), t.executed(time.Millisecond, false))
|
||||||
|
assert.False(t.T(), t.errored(time.Millisecond))
|
||||||
|
assert.False(t.T(), t.panicked(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) Test_getWrappedFuncError() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.clear()
|
||||||
|
t.onceErrorJob()
|
||||||
|
fn := t.job.getWrappedFunc("job", t.testLogFunc())
|
||||||
|
require.NotNil(t.T(), fn)
|
||||||
|
go fn()
|
||||||
|
assert.True(t.T(), t.executed(time.Millisecond, false))
|
||||||
|
assert.True(t.T(), t.errored(time.Millisecond))
|
||||||
|
assert.False(t.T(), t.panicked(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) Test_getWrappedFuncPanic() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.clear()
|
||||||
|
t.oncePanicJob()
|
||||||
|
fn := t.job.getWrappedFunc("job", t.testLogFunc())
|
||||||
|
require.NotNil(t.T(), fn)
|
||||||
|
go fn()
|
||||||
|
assert.True(t.T(), t.executed(time.Millisecond, false))
|
||||||
|
assert.False(t.T(), t.errored(time.Millisecond))
|
||||||
|
assert.True(t.T(), t.panicked(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) Test_run() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.regularJob()
|
||||||
|
t.job.run("job", t.testLogFunc())
|
||||||
|
time.Sleep(time.Millisecond * 5)
|
||||||
|
t.job.stop()
|
||||||
|
require.True(t.T(), t.executed(time.Millisecond, false))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) Test_runOnce() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.regularJob()
|
||||||
|
t.job.runOnce("job", t.testLogFunc())
|
||||||
|
time.Sleep(time.Millisecond * 5)
|
||||||
|
require.True(t.T(), t.executed(time.Millisecond, false))
|
||||||
|
first := 0
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-t.randomNumber:
|
||||||
|
first = c
|
||||||
|
case <-time.After(time.Millisecond * 2):
|
||||||
|
first = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
second := 0
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-t.randomNumber:
|
||||||
|
second = c
|
||||||
|
case <-time.After(time.Millisecond * 2):
|
||||||
|
second = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NotEqual(t.T(), first, second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobTest) Test_runOnceSync() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.clear()
|
||||||
|
t.regularSyncJob()
|
||||||
|
require.False(t.T(), t.syncBool)
|
||||||
|
t.job.runOnceSync("job", t.testLogFunc())
|
||||||
|
assert.True(t.T(), t.syncBool)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) SetupSuite() {
|
||||||
|
t.manager = NewJobManager()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) ranFlag() bool {
|
||||||
|
if t.runnerFlag == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c := <-t.runnerFlag:
|
||||||
|
return c
|
||||||
|
case <-time.After(time.Millisecond):
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_SetLogger() {
|
||||||
|
t.manager.logger = nil
|
||||||
|
t.manager.SetLogger(NewLogger("test", logging.ERROR, DefaultLogFormatter()))
|
||||||
|
assert.IsType(t.T(), &logging.Logger{}, t.manager.logger)
|
||||||
|
|
||||||
|
t.manager.SetLogger(nil)
|
||||||
|
assert.IsType(t.T(), &logging.Logger{}, t.manager.logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_SetLogging() {
|
||||||
|
t.manager.enableLogging = false
|
||||||
|
t.manager.SetLogging(true)
|
||||||
|
assert.True(t.T(), t.manager.enableLogging)
|
||||||
|
|
||||||
|
t.manager.SetLogging(false)
|
||||||
|
assert.False(t.T(), t.manager.enableLogging)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RegisterJobNil() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RegisterJob("job", nil)
|
||||||
|
assert.EqualError(t.T(), err, "job shouldn't be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RegisterJob() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RegisterJob("job", &Job{
|
||||||
|
Command: func(log JobLogFunc) error {
|
||||||
|
t.runnerFlag <- true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: DefaultJobErrorHandler(),
|
||||||
|
PanicHandler: DefaultJobPanicHandler(),
|
||||||
|
})
|
||||||
|
assert.NoError(t.T(), err)
|
||||||
|
err = t.manager.RegisterJob("job_regular", &Job{
|
||||||
|
Command: func(log JobLogFunc) error {
|
||||||
|
t.runnerFlag <- true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: DefaultJobErrorHandler(),
|
||||||
|
PanicHandler: DefaultJobPanicHandler(),
|
||||||
|
Regular: true,
|
||||||
|
Interval: time.Millisecond,
|
||||||
|
})
|
||||||
|
assert.NoError(t.T(), err)
|
||||||
|
err = t.manager.RegisterJob("job_sync", &Job{
|
||||||
|
Command: func(log JobLogFunc) error {
|
||||||
|
t.syncRunnerFlag = true
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: DefaultJobErrorHandler(),
|
||||||
|
PanicHandler: DefaultJobPanicHandler(),
|
||||||
|
})
|
||||||
|
assert.NoError(t.T(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RegisterJobAlreadyExists() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RegisterJob("job", &Job{})
|
||||||
|
assert.EqualError(t.T(), err, "job already exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_FetchJobDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
_, ok := t.manager.FetchJob("doesn't exist")
|
||||||
|
assert.False(t.T(), ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_FetchJob() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t.T(), t.manager.RegisterJob("test_fetch", &Job{Command: func(logFunc JobLogFunc) error {
|
||||||
|
return nil
|
||||||
|
}}))
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
job, ok := t.manager.FetchJob("test_fetch")
|
||||||
|
assert.True(t.T(), ok)
|
||||||
|
require.NotNil(t.T(), job)
|
||||||
|
assert.NotNil(t.T(), job.Command)
|
||||||
|
_ = t.manager.UnregisterJob("test_fetch")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_UpdateJobDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.UpdateJob("doesn't exist", &Job{})
|
||||||
|
assert.EqualError(t.T(), err, "cannot find job `doesn't exist`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_UpdateJob() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
job, _ := t.manager.FetchJob("job")
|
||||||
|
err := t.manager.UpdateJob("job", job)
|
||||||
|
assert.NoError(t.T(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_StopJobDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.StopJob("doesn't exist")
|
||||||
|
assert.EqualError(t.T(), err, "cannot find job `doesn't exist`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RunJobDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RunJob("doesn't exist")
|
||||||
|
assert.EqualError(t.T(), err, "cannot find job `doesn't exist`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RunJob() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
t.runnerFlag = make(chan bool)
|
||||||
|
err := t.manager.RunJob("job_regular")
|
||||||
|
require.NoError(t.T(), err)
|
||||||
|
time.Sleep(time.Millisecond * 5)
|
||||||
|
assert.True(t.T(), <-t.runnerFlag)
|
||||||
|
err = t.manager.StopJob("job_regular")
|
||||||
|
require.NoError(t.T(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RunJobOnceDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RunJobOnce("doesn't exist")
|
||||||
|
assert.EqualError(t.T(), err, "cannot find job `doesn't exist`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RunJobOnce() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
go func() { t.runnerFlag <- false }()
|
||||||
|
err := t.manager.RunJobOnce("job")
|
||||||
|
require.NoError(t.T(), err)
|
||||||
|
assert.True(t.T(), t.ranFlag())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RunJobOnceSyncDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RunJobOnceSync("doesn't exist")
|
||||||
|
assert.EqualError(t.T(), err, "cannot find job `doesn't exist`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_RunJobOnceSync() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.RunJobOnceSync("job_sync")
|
||||||
|
require.NoError(t.T(), err)
|
||||||
|
assert.True(t.T(), t.syncRunnerFlag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_UnregisterJobDoesntExist() {
|
||||||
|
require.NotNil(t.T(), t.manager.jobs)
|
||||||
|
err := t.manager.UnregisterJob("doesn't exist")
|
||||||
|
assert.EqualError(t.T(), err, "cannot find job `doesn't exist`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_Start() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
manager := NewJobManager()
|
||||||
|
_ = manager.RegisterJob("job", &Job{
|
||||||
|
Command: func(logFunc JobLogFunc) error {
|
||||||
|
logFunc("alive!", logging.INFO)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
ErrorHandler: DefaultJobErrorHandler(),
|
||||||
|
PanicHandler: DefaultJobPanicHandler(),
|
||||||
|
})
|
||||||
|
manager.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *JobManagerTest) Test_log() {
|
||||||
|
defer func() {
|
||||||
|
require.Nil(t.T(), recover())
|
||||||
|
}()
|
||||||
|
|
||||||
|
testLog := func() {
|
||||||
|
t.manager.log("test", logging.CRITICAL)
|
||||||
|
t.manager.log("test", logging.ERROR)
|
||||||
|
t.manager.log("test", logging.WARNING)
|
||||||
|
t.manager.log("test", logging.NOTICE)
|
||||||
|
t.manager.log("test", logging.INFO)
|
||||||
|
t.manager.log("test", logging.DEBUG)
|
||||||
|
}
|
||||||
|
t.manager.SetLogging(false)
|
||||||
|
testLog()
|
||||||
|
t.manager.SetLogging(true)
|
||||||
|
t.manager.logger = nil
|
||||||
|
testLog()
|
||||||
|
t.manager.logger = NewLogger("test", logging.DEBUG, DefaultLogFormatter())
|
||||||
|
testLog()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user