diff --git a/core/health/counter.go b/core/healthcheck/counter.go similarity index 99% rename from core/health/counter.go rename to core/healthcheck/counter.go index 584e609..3a8c2dd 100644 --- a/core/health/counter.go +++ b/core/healthcheck/counter.go @@ -1,4 +1,4 @@ -package health +package healthcheck import ( "time" diff --git a/core/health/iface.go b/core/healthcheck/iface.go similarity index 89% rename from core/health/iface.go rename to core/healthcheck/iface.go index 7c11b87..9c53ddf 100644 --- a/core/health/iface.go +++ b/core/healthcheck/iface.go @@ -1,4 +1,13 @@ -package health +package healthcheck + +var ( + // compile-time checks to ensure that implementations are compatible with the interface + _ = Storage(&SyncMapStorage{}) + _ = Counter(&AtomicCounter{}) + _ = Processor(CounterProcessor{}) + _ = NotifyFunc(DefaultNotifyFunc) + _ = CounterConstructor(NewAtomicCounter) +) // Storage stores different instances of Counter. Implementation should be goroutine-safe. type Storage interface { @@ -55,7 +64,7 @@ type Counter interface { // Processor is used to check if Counter is in error state and act accordingly. type Processor interface { // Process counter data. This method is not goroutine-safe! - Process(id int, counter Counter) + Process(id int, counter Counter) bool } // NotifyMessageLocalizer is the smallest subset of core.Localizer used in the @@ -66,11 +75,11 @@ type NotifyMessageLocalizer interface { // NotifyFunc will send notification about error to the system with provided credentials. // It will send the notification to system admins. -type NotifyFunc func(apiURL, apiKey, msg string) +type NotifyFunc func(apiURL, apiKey, msg string) error // CounterConstructor is used to create counters. This way you can implement your own counter and still use default CounterStorage. type CounterConstructor func(name string) Counter // ConnectionDataProvider should return the connection credentials and language by counter ID. // It's best to use account ID as a counter ID to be able to retrieve the necessary data as easy as possible. -type ConnectionDataProvider func(id int) (apiURL, apiKey, lang string) +type ConnectionDataProvider func(id int) (apiURL, apiKey, lang string, exists bool) diff --git a/core/healthcheck/notifier.go b/core/healthcheck/notifier.go new file mode 100644 index 0000000..8fe3859 --- /dev/null +++ b/core/healthcheck/notifier.go @@ -0,0 +1,13 @@ +package healthcheck + +import retailcrm "github.com/retailcrm/api-client-go/v2" + +func DefaultNotifyFunc(apiURL, apiKey, msg string) error { + client := retailcrm.New(apiURL, apiKey) + _, err := client.NotificationsSend(retailcrm.NotificationsSendRequest{ + UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins}, + Type: retailcrm.NotificationTypeError, + Message: msg, + }) + return err +} diff --git a/core/health/processor.go b/core/healthcheck/processor.go similarity index 54% rename from core/health/processor.go rename to core/healthcheck/processor.go index fe469ea..ddfb13e 100644 --- a/core/health/processor.go +++ b/core/healthcheck/processor.go @@ -1,4 +1,12 @@ -package health +package healthcheck + +import ( + "errors" + + "github.com/retailcrm/mg-transport-core/v2/core/logger" +) + +var ErrNoConnection = errors.New("no connection") const ( // DefaultMinRequests is a default minimal threshold of total requests. If Counter has less than this amount of requests @@ -13,23 +21,34 @@ const ( // CounterProcessor is a default implementation of Processor. It will try to localize the message in case of error. type CounterProcessor struct { Localizer NotifyMessageLocalizer + Logger logger.Logger Notifier NotifyFunc ConnectionDataProvider ConnectionDataProvider Error string FailureThreshold float64 MinRequests uint32 + Debug bool } -func (c CounterProcessor) Process(id int, counter Counter) { +func (c CounterProcessor) Process(id int, counter Counter) bool { if counter.IsFailed() { if counter.IsFailureProcessed() { - return + c.debugLog("skipping counter id=%d because its failure is already processed", id) + return true } - apiURL, apiKey, _ := c.ConnectionDataProvider(id) - c.Notifier(apiURL, apiKey, counter.Message()) + apiURL, apiKey, _, exists := c.ConnectionDataProvider(id) + if !exists { + c.debugLog("cannot find connection data for counter id=%d", id) + return true + } + err := c.Notifier(apiURL, apiKey, counter.Message()) + if err != nil { + c.debugLog("cannot send notification for counter id=%d: %s (message: %s)", + id, err, counter.Message()) + } counter.FailureProcessed() - return + return true } succeeded := counter.TotalSucceeded() @@ -38,7 +57,8 @@ func (c CounterProcessor) Process(id int, counter Counter) { // Ignore this counter for now because total count of requests is less than minimal count. // The results may not be representative. if (succeeded + failed) < c.MinRequests { - return + c.debugLog("skipping counter id=%d because it has fewer than %d requests", id, c.MinRequests) + return true } // If more than FailureThreshold % of requests are successful, don't do anything. @@ -46,18 +66,26 @@ func (c CounterProcessor) Process(id int, counter Counter) { if (float64(succeeded) / float64(succeeded+failed)) >= c.FailureThreshold { counter.ClearCountersProcessed() counter.FlushCounters() - return + return true } - // Do not process counters values twice if error ocurred. + // Do not process counters values twice if error occurred. if counter.IsCountersProcessed() { - return + return true } - apiURL, apiKey, lang := c.ConnectionDataProvider(id) - c.Notifier(apiURL, apiKey, c.getErrorText(counter.Name(), c.Error, lang)) + apiURL, apiKey, lang, exists := c.ConnectionDataProvider(id) + if !exists { + c.debugLog("cannot find connection data for counter id=%d", id) + return true + } + err := c.Notifier(apiURL, apiKey, c.getErrorText(counter.Name(), c.Error, lang)) + if err != nil { + c.debugLog("cannot send notification for counter id=%d: %s (message: %s)", + id, err, counter.Message()) + } counter.CountersProcessed() - return + return true } func (c CounterProcessor) getErrorText(name, msg, lang string) string { @@ -65,5 +93,13 @@ func (c CounterProcessor) getErrorText(name, msg, lang string) string { return msg } c.Localizer.SetLocale(lang) - return c.Localizer.GetLocalizedTemplateMessage(msg, map[string]interface{}{"Name": name}) + return c.Localizer.GetLocalizedTemplateMessage(msg, map[string]interface{}{ + "Name": name, + }) +} + +func (c CounterProcessor) debugLog(msg string, args ...interface{}) { + if c.Debug { + c.Logger.Debugf(msg, args...) + } } diff --git a/core/health/storage.go b/core/healthcheck/storage.go similarity index 92% rename from core/health/storage.go rename to core/healthcheck/storage.go index 5ae4870..1c7dd47 100644 --- a/core/health/storage.go +++ b/core/healthcheck/storage.go @@ -1,4 +1,4 @@ -package health +package healthcheck import "sync" @@ -33,7 +33,6 @@ func (s *SyncMapStorage) Remove(id int) { func (s *SyncMapStorage) Process(proc Processor) { s.m.Range(func(key, value any) bool { - proc.Process(key.(int), value.(Counter)) - return false + return proc.Process(key.(int), value.(Counter)) }) } diff --git a/core/job_manager.go b/core/job_manager.go index d6fc49a..fac84de 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -12,6 +12,15 @@ import ( // JobFunc is empty func which should be executed in a parallel goroutine. type JobFunc func(logger.Logger) error +// JobAfterCallback will be called after specific job is done. +// This can be used to run something after asynchronous job is done. The function will +// receive an error from the job which can be used to alter the callback behavior. If callback +// returns an error, it will be processed using job ErrorHandler. +// The callback won't be executed in case of panic. Panicked callback will be show in logs +// as if the job itself panicked, ergo, do not write jobs or callbacks that can panic, +// or process the panic by yourself. +type JobAfterCallback func(jobError error, log logger.Logger) error + // JobErrorHandler is a function to handle jobs errors. First argument is a job name. type JobErrorHandler func(string, error, logger.Logger) @@ -32,21 +41,22 @@ type Job struct { // JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as // singleton), or jobs can be executed as regular jobs. Example initialization: -// manager := NewJobManager(). -// SetLogger(logger). -// SetLogging(false) -// _ = manager.RegisterJob("updateTokens", &Job{ -// Command: func(log logger.Logger) error { -// // logic goes here... -// logger.Info("All tokens were updated successfully") -// return nil -// }, -// ErrorHandler: DefaultJobErrorHandler(), -// PanicHandler: DefaultJobPanicHandler(), -// Interval: time.Hour * 3, -// Regular: true, -// }) -// manager.Start() +// +// manager := NewJobManager(). +// SetLogger(logger). +// SetLogging(false) +// _ = manager.RegisterJob("updateTokens", &Job{ +// Command: func(log logger.Logger) error { +// // logic goes here... +// logger.Info("All tokens were updated successfully") +// return nil +// }, +// ErrorHandler: DefaultJobErrorHandler(), +// PanicHandler: DefaultJobPanicHandler(), +// Interval: time.Hour * 3, +// Regular: true, +// }) +// manager.Start() type JobManager struct { logger logger.Logger nilLogger logger.Logger @@ -55,17 +65,24 @@ type JobManager struct { } // getWrappedFunc wraps job into function. -func (j *Job) getWrappedFunc(name string, log logger.Logger) func() { - return func() { +func (j *Job) getWrappedFunc(name string, log logger.Logger) func(callback JobAfterCallback) { + return func(callback JobAfterCallback) { defer func() { if r := recover(); r != nil && j.PanicHandler != nil { j.PanicHandler(name, r, log) } }() - if err := j.Command(log); err != nil && j.ErrorHandler != nil { + err := j.Command(log) + if err != nil && j.ErrorHandler != nil { j.ErrorHandler(name, err, log) } + if callback != nil { + err := callback(err, log) + if j.ErrorHandler != nil { + j.ErrorHandler(name, err, log) + } + } } } @@ -77,7 +94,7 @@ func (j *Job) getWrappedTimerFunc(name string, log logger.Logger) func(chan bool case <-stopChannel: return default: - j.getWrappedFunc(name, log)() + j.getWrappedFunc(name, log)(nil) } } } @@ -118,13 +135,13 @@ func (j *Job) stop() { } // runOnce run job once. -func (j *Job) runOnce(name string, log logger.Logger) { - go j.getWrappedFunc(name, log)() +func (j *Job) runOnce(name string, log logger.Logger, callback JobAfterCallback) { + go j.getWrappedFunc(name, log)(callback) } // runOnceSync run job once in current goroutine. func (j *Job) runOnceSync(name string, log logger.Logger) { - j.getWrappedFunc(name, log)() + j.getWrappedFunc(name, log)(nil) } // NewJobManager is a JobManager constructor. @@ -241,15 +258,52 @@ func (j *JobManager) StopJob(name string) error { } // RunJobOnce starts provided job once if it exists. It's also async. -func (j *JobManager) RunJobOnce(name string) error { +func (j *JobManager) RunJobOnce(name string, callback ...JobAfterCallback) error { if job, ok := j.FetchJob(name); ok { - job.runOnce(name, j.Logger()) + var cb JobAfterCallback + if len(callback) > 0 { + cb = callback[0] + } + job.runOnce(name, j.Logger(), cb) return nil } return fmt.Errorf("cannot find job `%s`", name) } +// RunJobsOnceSequentially will execute provided jobs asynchronously. It uses JobAfterCallback under the hood. +// You can prevent subsequent jobs from running using stopOnError flag. +func (j *JobManager) RunJobsOnceSequentially(names []string, stopOnError bool) error { + if len(names) == 0 { + return nil + } + + var chained JobAfterCallback + for i := len(names) - 1; i > 0; i-- { + i := i + if chained == nil { + chained = func(jobError error, log logger.Logger) error { + if jobError != nil && stopOnError { + return jobError + } + return j.RunJobOnce(names[i]) + } + continue + } + + oldCallback := chained + chained = func(jobError error, log logger.Logger) error { + if jobError != nil && stopOnError { + return jobError + } + err := j.RunJobOnce(names[i], oldCallback) + return err + } + } + + return j.RunJobOnce(names[0], chained) +} + // RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work. func (j *JobManager) RunJobOnceSync(name string) error { if job, ok := j.FetchJob(name); ok { diff --git a/core/job_manager_test.go b/core/job_manager_test.go index b71a86a..6da22de 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -293,7 +293,7 @@ func (t *JobTest) Test_getWrappedFunc() { t.onceJob() fn := t.job.getWrappedFunc("job", t.testLogger()) require.NotNil(t.T(), fn) - go fn() + go fn(nil) assert.True(t.T(), t.executed()) assert.False(t.T(), t.errored(time.Millisecond)) assert.False(t.T(), t.panicked(time.Millisecond)) @@ -308,7 +308,7 @@ func (t *JobTest) Test_getWrappedFuncError() { t.onceErrorJob() fn := t.job.getWrappedFunc("job", t.testLogger()) require.NotNil(t.T(), fn) - go fn() + go fn(nil) assert.True(t.T(), t.executed()) assert.True(t.T(), t.errored(time.Millisecond)) assert.False(t.T(), t.panicked(time.Millisecond)) @@ -323,7 +323,7 @@ func (t *JobTest) Test_getWrappedFuncPanic() { t.oncePanicJob() fn := t.job.getWrappedFunc("job", t.testLogger()) require.NotNil(t.T(), fn) - go fn() + go fn(nil) assert.True(t.T(), t.executed()) assert.False(t.T(), t.errored(time.Millisecond)) assert.True(t.T(), t.panicked(time.Millisecond)) @@ -347,7 +347,7 @@ func (t *JobTest) Test_runOnce() { }() t.regularJob() - t.job.runOnce("job", t.testLogger()) + t.job.runOnce("job", t.testLogger(), nil) time.Sleep(time.Millisecond * 5) require.True(t.T(), t.executed()) first := 0 diff --git a/core/module_featurees_uploader_test.go b/core/module_features_uploader_test.go similarity index 100% rename from core/module_featurees_uploader_test.go rename to core/module_features_uploader_test.go diff --git a/go.mod b/go.mod index 15c96b2..5303606 100644 --- a/go.mod +++ b/go.mod @@ -1,44 +1,78 @@ module github.com/retailcrm/mg-transport-core/v2 -go 1.12 +go 1.18 require ( - cloud.google.com/go v0.60.0 // indirect github.com/DATA-DOG/go-sqlmock v1.3.3 github.com/aws/aws-sdk-go v1.36.30 github.com/aws/aws-sdk-go-v2 v1.17.2 github.com/aws/aws-sdk-go-v2/config v1.18.4 github.com/aws/aws-sdk-go-v2/service/s3 v1.29.5 github.com/blacked/go-zabbix v0.0.0-20170118040903-3c6a95ec4fdc - github.com/denisenkom/go-mssqldb v0.0.0-20190830225923-3302f0226fbd // indirect github.com/getsentry/sentry-go v0.12.0 github.com/gin-contrib/multitemplate v0.0.0-20190914010127-bba2ccfe37ec github.com/gin-gonic/gin v1.7.2 github.com/go-playground/validator/v10 v10.8.0 - github.com/go-sql-driver/mysql v1.5.0 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/gomarkdown/markdown v0.0.0-20221013030248-663e2500819c github.com/gorilla/securecookie v1.1.1 github.com/gorilla/sessions v1.2.0 github.com/jessevdk/go-flags v1.4.0 github.com/jinzhu/gorm v1.9.11 - github.com/json-iterator/go v1.1.11 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/lib/pq v1.9.0 // indirect github.com/nicksnyder/go-i18n/v2 v2.0.2 - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/pkg/errors v0.9.1 - github.com/retailcrm/api-client-go/v2 v2.0.12 + github.com/retailcrm/api-client-go/v2 v2.1.3 github.com/retailcrm/mg-transport-api-client-go v1.1.32 github.com/retailcrm/zabbix-metrics-collector v1.0.0 github.com/stretchr/testify v1.8.1 - github.com/ugorji/go v1.2.6 // indirect go.uber.org/atomic v1.10.0 golang.org/x/text v0.3.7 - google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/gormigrate.v1 v1.6.0 gopkg.in/h2non/gock.v1 v1.1.2 gopkg.in/yaml.v2 v2.4.0 ) + +require ( + cloud.google.com/go v0.60.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.21 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.20 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect + github.com/aws/smithy-go v1.13.5 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/denisenkom/go-mssqldb v0.0.0-20190830225923-3302f0226fbd // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.13.0 // indirect + github.com/go-playground/universal-translator v0.17.0 // indirect + github.com/go-sql-driver/mysql v1.5.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-querystring v1.1.0 // indirect + github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/json-iterator/go v1.1.11 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/leodido/go-urn v1.2.1 // indirect + github.com/lib/pq v1.9.0 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/ugorji/go/codec v1.2.6 // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect + golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect + google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 3699060..a04db1a 100644 --- a/go.sum +++ b/go.sum @@ -361,8 +361,8 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/retailcrm/api-client-go/v2 v2.0.12 h1:0pMDsz4fzKpizUxoF1qtgwN+oJ+YJruncCj6OTOB3e8= -github.com/retailcrm/api-client-go/v2 v2.0.12/go.mod h1:1yTZl9+gd3+/k0kAJe7sYvC+mL4fqMwIwtnSgSWZlkQ= +github.com/retailcrm/api-client-go/v2 v2.1.3 h1:AVcp9oeSOm6+3EWXCgdQs+XE3PTjzCKKB//MUAe0Zb0= +github.com/retailcrm/api-client-go/v2 v2.1.3/go.mod h1:1yTZl9+gd3+/k0kAJe7sYvC+mL4fqMwIwtnSgSWZlkQ= github.com/retailcrm/mg-transport-api-client-go v1.1.32 h1:IBPltSoD5q2PPZJbNC/prK5F9rEVPXVx/ZzDpi7HKhs= github.com/retailcrm/mg-transport-api-client-go v1.1.32/go.mod h1:AWV6BueE28/6SCoyfKURTo4lF0oXYoOKmHTzehd5vAI= github.com/retailcrm/zabbix-metrics-collector v1.0.0 h1:ju3rhpgVoiKII6oXEJEf2eoJy5bNcYAmOPRp1oPWDmA= @@ -397,7 +397,6 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go v1.2.6 h1:tGiWC9HENWE2tqYycIqFTNorMmFRVhNwCpDOpWqnk8E= github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=