diff --git a/core/health/counter.go b/core/health/counter.go new file mode 100644 index 0000000..78604c0 --- /dev/null +++ b/core/health/counter.go @@ -0,0 +1,97 @@ +package health + +import ( + "time" + + "go.uber.org/atomic" +) + +// DefaultResetPeriod is a default period for AtomicCounter after which internal request counters will be reset. +const DefaultResetPeriod = time.Minute * 15 + +// AtomicCounter is a default Counter implementation. +// It uses atomics under the hood (hence the name) and can be configured with custom reset timeout and +type AtomicCounter struct { + msg atomic.String + timestamp atomic.Time + resetPeriod time.Duration + failure atomic.Uint32 + failed atomic.Bool + failureProcessed atomic.Bool + countersProcessed atomic.Bool + success atomic.Uint32 +} + +// NewAtomicCounterWithPeriod returns AtomicCounter configured with provided period. +func NewAtomicCounterWithPeriod(resetPeriod time.Duration) Counter { + c := &AtomicCounter{} + c.resetPeriod = resetPeriod + c.timestamp.Store(time.Now()) + return c +} + +// NewAtomicCounter returns AtomicCounter with DefaultResetPeriod. +func NewAtomicCounter() Counter { + return NewAtomicCounterWithPeriod(DefaultResetPeriod) +} + +func (a *AtomicCounter) HitSuccess() { + a.success.Add(1) + if a.failed.CompareAndSwap(true, false) { + a.failureProcessed.Store(false) + a.msg.Store("") + } +} + +func (a *AtomicCounter) HitFailure() { + a.failure.Add(1) +} + +func (a *AtomicCounter) TotalSucceeded() uint32 { + return a.success.Load() +} + +func (a *AtomicCounter) TotalFailed() uint32 { + return a.failure.Load() +} + +func (a *AtomicCounter) Failed(message string) { + a.msg.Store(message) + a.failed.Store(true) +} + +func (a *AtomicCounter) IsFailed() bool { + return a.failed.Load() +} + +func (a *AtomicCounter) Message() string { + return a.msg.Load() +} + +func (a *AtomicCounter) IsFailureProcessed() bool { + return a.failureProcessed.Load() +} + +func (a *AtomicCounter) FailureProcessed() { + a.failureProcessed.Store(true) +} + +func (a *AtomicCounter) IsCountersProcessed() bool { + return a.countersProcessed.Load() +} + +func (a *AtomicCounter) CountersProcessed() { + a.countersProcessed.Store(true) +} + +func (a *AtomicCounter) ClearCountersProcessed() { + a.countersProcessed.Store(false) +} + +func (a *AtomicCounter) FlushCounters() { + if time.Now().After(a.timestamp.Load().Add(time.Minute * 15)) { + a.timestamp.Store(time.Now()) + a.success.Store(0) + a.failure.Store(0) + } +} diff --git a/core/health/iface.go b/core/health/iface.go new file mode 100644 index 0000000..eb3f143 --- /dev/null +++ b/core/health/iface.go @@ -0,0 +1,71 @@ +package health + +// Storage stores different instances of Counter. Implementation should be goroutine-safe. +type Storage interface { + // Get counter by its ID. The counter will be instantiated automatically if necessary. + Get(id int) Counter + // Remove counter if it exists. + Remove(id int) + // Process will iterate over counters and call Processor on each of them. + // This method is used to collect counters data & send notifications. + Process(processor Processor) +} + +// Counter will count successful and failed requests. Its contents can be used to judge if specific entity (e.g. Connection / Account) +// is not working properly (invalid credentials, too many failed requests, etc) and take further action based on the result. +// Implementation should be goroutine-safe. +type Counter interface { + // HitSuccess registers successful request. It should automatically clear error state because that state should be + // used only if error is totally unrecoverable. + HitSuccess() + // HitFailure registers failed request. + HitFailure() + // TotalSucceeded returns how many requests were successful. + TotalSucceeded() uint32 + // TotalFailed returns how many requests have failed. + TotalFailed() uint32 + // Failed will put Counter into failed state with specific error message. + Failed(message string) + // IsFailed returns true if Counter is in failed state. + IsFailed() bool + // Message will return error message if Counter is in failed state. + Message() string + // IsFailureProcessed will return true if current error inside counter has been processed already. + IsFailureProcessed() bool + // FailureProcessed will mark current error inside Counter as processed. + FailureProcessed() + // IsCountersProcessed returns true if counters value has been processed by the checker. + // This can be used if you want to process counter values only once. + IsCountersProcessed() bool + // CountersProcessed will mark current counters value as processed. + CountersProcessed() + // ClearCountersProcessed will set IsCountersProcessed to false. + ClearCountersProcessed() + // FlushCounters will reset request counters if deemed necessary (for example, AtomicCounter will clear counters + // only if their contents are older than provided time period). + // This won't clear IsCountersProcessed flag! + FlushCounters() +} + +// Processor is used to check if Counter is in error state and act accordingly. +type Processor interface { + // Process counter data. This method is not goroutine-safe! + Process(id int, counter Counter) +} + +// NotifyMessageLocalizer is the smallest subset of core.Localizer used in the +type NotifyMessageLocalizer interface { + SetLocale(string) + GetLocalizedMessage(string) string +} + +// NotifyFunc will send notification about error to the system with provided credentials. +// It will send the notification to system admins. +type NotifyFunc func(apiURL, apiKey, msg string) + +// CounterConstructor is used to create counters. This way you can implement your own counter and still use default CounterStorage. +type CounterConstructor func() Counter + +// ConnectionDataProvider should return the connection credentials and language by counter ID. +// It's best to use account ID as a counter ID to be able to retrieve the necessary data as easy as possible. +type ConnectionDataProvider func(id int) (apiURL, apiKey, lang string) diff --git a/core/health/processor.go b/core/health/processor.go new file mode 100644 index 0000000..35804ca --- /dev/null +++ b/core/health/processor.go @@ -0,0 +1,69 @@ +package health + +const ( + // DefaultMinRequests is a default minimal threshold of total requests. If Counter has less than this amount of requests + // total, it will be skipped because it can trigger false alerts otherwise. + DefaultMinRequests = 10 + + // DefaultFailureThreshold is a default value of successful requests that should be passed in order to suppress any + // error notifications. If less than that percentage of requests are successful, the notification will be sent. + DefaultFailureThreshold = 0.8 +) + +// CounterProcessor is a default implementation of Processor. It will try to localize the message in case of error. +type CounterProcessor struct { + Localizer NotifyMessageLocalizer + Notifier NotifyFunc + ConnectionDataProvider ConnectionDataProvider + Error string + FailureThreshold float64 + MinRequests uint32 +} + +func (c CounterProcessor) Process(id int, counter Counter) { + if counter.IsFailed() { + if counter.IsFailureProcessed() { + return + } + + apiURL, apiKey, lang := c.ConnectionDataProvider(id) + c.Notifier(apiURL, apiKey, c.getErrorText(counter.Message(), lang)) + counter.FailureProcessed() + return + } + + succeeded := counter.TotalSucceeded() + failed := counter.TotalFailed() + + // Ignore this counter for now because total count of requests is less than minimal count. + // The results may not be representative. + if (succeeded + failed) < c.MinRequests { + return + } + + // If more than FailureThreshold % of requests are successful, don't do anything. + // Default value is 0.8 which would be 80% of successful requests. + if (float64(succeeded) / float64(succeeded+failed)) >= c.FailureThreshold { + counter.ClearCountersProcessed() + counter.FlushCounters() + return + } + + // Do not process counters values twice if error ocurred. + if counter.IsCountersProcessed() { + return + } + + apiURL, apiKey, lang := c.ConnectionDataProvider(id) + c.Notifier(apiURL, apiKey, c.getErrorText(c.Error, lang)) + counter.CountersProcessed() + return +} + +func (c CounterProcessor) getErrorText(msg, lang string) string { + if c.Localizer == nil { + return msg + } + c.Localizer.SetLocale(lang) + return c.Localizer.GetLocalizedMessage(msg) +} diff --git a/core/health/storage.go b/core/health/storage.go new file mode 100644 index 0000000..b74a7c4 --- /dev/null +++ b/core/health/storage.go @@ -0,0 +1,37 @@ +package health + +import "sync" + +// SyncMapStorage is a default Storage implementation. It uses sync.Map under the hood because +// deletions should be rare for the storage. If your business logic calls Remove often, it would be better +// to use your own implementation with map[int]Counter and sync.RWMutex. +type SyncMapStorage struct { + constructor CounterConstructor + m sync.Map +} + +// NewSyncMapStorage is a SyncMapStorage constructor. +func NewSyncMapStorage(constructor CounterConstructor) Storage { + return &SyncMapStorage{constructor: constructor} +} + +func (s *SyncMapStorage) Get(id int) Counter { + val, found := s.m.Load(id) + if found { + return val.(Counter) + } + c := s.constructor() + s.m.Store(id, c) + return c +} + +func (s *SyncMapStorage) Remove(id int) { + s.m.Delete(id) +} + +func (s *SyncMapStorage) Process(proc Processor) { + s.m.Range(func(key, value any) bool { + proc.Process(key.(int), value.(Counter)) + return false + }) +} diff --git a/go.mod b/go.mod index d48d79e..15c96b2 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/retailcrm/zabbix-metrics-collector v1.0.0 github.com/stretchr/testify v1.8.1 github.com/ugorji/go v1.2.6 // indirect + go.uber.org/atomic v1.10.0 golang.org/x/text v0.3.7 google.golang.org/protobuf v1.27.1 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect diff --git a/go.sum b/go.sum index 27f0915..3699060 100644 --- a/go.sum +++ b/go.sum @@ -424,6 +424,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=