mirror of
https://github.com/retailcrm/mg-transport-core.git
synced 2024-11-25 06:36:03 +03:00
WIP: healthcheck counters
This commit is contained in:
parent
6ae59d2f49
commit
7bff09f467
97
core/health/counter.go
Normal file
97
core/health/counter.go
Normal file
@ -0,0 +1,97 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
// DefaultResetPeriod is a default period for AtomicCounter after which internal request counters will be reset.
|
||||
const DefaultResetPeriod = time.Minute * 15
|
||||
|
||||
// AtomicCounter is a default Counter implementation.
|
||||
// It uses atomics under the hood (hence the name) and can be configured with custom reset timeout and
|
||||
type AtomicCounter struct {
|
||||
msg atomic.String
|
||||
timestamp atomic.Time
|
||||
resetPeriod time.Duration
|
||||
failure atomic.Uint32
|
||||
failed atomic.Bool
|
||||
failureProcessed atomic.Bool
|
||||
countersProcessed atomic.Bool
|
||||
success atomic.Uint32
|
||||
}
|
||||
|
||||
// NewAtomicCounterWithPeriod returns AtomicCounter configured with provided period.
|
||||
func NewAtomicCounterWithPeriod(resetPeriod time.Duration) Counter {
|
||||
c := &AtomicCounter{}
|
||||
c.resetPeriod = resetPeriod
|
||||
c.timestamp.Store(time.Now())
|
||||
return c
|
||||
}
|
||||
|
||||
// NewAtomicCounter returns AtomicCounter with DefaultResetPeriod.
|
||||
func NewAtomicCounter() Counter {
|
||||
return NewAtomicCounterWithPeriod(DefaultResetPeriod)
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) HitSuccess() {
|
||||
a.success.Add(1)
|
||||
if a.failed.CompareAndSwap(true, false) {
|
||||
a.failureProcessed.Store(false)
|
||||
a.msg.Store("")
|
||||
}
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) HitFailure() {
|
||||
a.failure.Add(1)
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) TotalSucceeded() uint32 {
|
||||
return a.success.Load()
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) TotalFailed() uint32 {
|
||||
return a.failure.Load()
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) Failed(message string) {
|
||||
a.msg.Store(message)
|
||||
a.failed.Store(true)
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) IsFailed() bool {
|
||||
return a.failed.Load()
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) Message() string {
|
||||
return a.msg.Load()
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) IsFailureProcessed() bool {
|
||||
return a.failureProcessed.Load()
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) FailureProcessed() {
|
||||
a.failureProcessed.Store(true)
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) IsCountersProcessed() bool {
|
||||
return a.countersProcessed.Load()
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) CountersProcessed() {
|
||||
a.countersProcessed.Store(true)
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) ClearCountersProcessed() {
|
||||
a.countersProcessed.Store(false)
|
||||
}
|
||||
|
||||
func (a *AtomicCounter) FlushCounters() {
|
||||
if time.Now().After(a.timestamp.Load().Add(time.Minute * 15)) {
|
||||
a.timestamp.Store(time.Now())
|
||||
a.success.Store(0)
|
||||
a.failure.Store(0)
|
||||
}
|
||||
}
|
71
core/health/iface.go
Normal file
71
core/health/iface.go
Normal file
@ -0,0 +1,71 @@
|
||||
package health
|
||||
|
||||
// Storage stores different instances of Counter. Implementation should be goroutine-safe.
|
||||
type Storage interface {
|
||||
// Get counter by its ID. The counter will be instantiated automatically if necessary.
|
||||
Get(id int) Counter
|
||||
// Remove counter if it exists.
|
||||
Remove(id int)
|
||||
// Process will iterate over counters and call Processor on each of them.
|
||||
// This method is used to collect counters data & send notifications.
|
||||
Process(processor Processor)
|
||||
}
|
||||
|
||||
// Counter will count successful and failed requests. Its contents can be used to judge if specific entity (e.g. Connection / Account)
|
||||
// is not working properly (invalid credentials, too many failed requests, etc) and take further action based on the result.
|
||||
// Implementation should be goroutine-safe.
|
||||
type Counter interface {
|
||||
// HitSuccess registers successful request. It should automatically clear error state because that state should be
|
||||
// used only if error is totally unrecoverable.
|
||||
HitSuccess()
|
||||
// HitFailure registers failed request.
|
||||
HitFailure()
|
||||
// TotalSucceeded returns how many requests were successful.
|
||||
TotalSucceeded() uint32
|
||||
// TotalFailed returns how many requests have failed.
|
||||
TotalFailed() uint32
|
||||
// Failed will put Counter into failed state with specific error message.
|
||||
Failed(message string)
|
||||
// IsFailed returns true if Counter is in failed state.
|
||||
IsFailed() bool
|
||||
// Message will return error message if Counter is in failed state.
|
||||
Message() string
|
||||
// IsFailureProcessed will return true if current error inside counter has been processed already.
|
||||
IsFailureProcessed() bool
|
||||
// FailureProcessed will mark current error inside Counter as processed.
|
||||
FailureProcessed()
|
||||
// IsCountersProcessed returns true if counters value has been processed by the checker.
|
||||
// This can be used if you want to process counter values only once.
|
||||
IsCountersProcessed() bool
|
||||
// CountersProcessed will mark current counters value as processed.
|
||||
CountersProcessed()
|
||||
// ClearCountersProcessed will set IsCountersProcessed to false.
|
||||
ClearCountersProcessed()
|
||||
// FlushCounters will reset request counters if deemed necessary (for example, AtomicCounter will clear counters
|
||||
// only if their contents are older than provided time period).
|
||||
// This won't clear IsCountersProcessed flag!
|
||||
FlushCounters()
|
||||
}
|
||||
|
||||
// Processor is used to check if Counter is in error state and act accordingly.
|
||||
type Processor interface {
|
||||
// Process counter data. This method is not goroutine-safe!
|
||||
Process(id int, counter Counter)
|
||||
}
|
||||
|
||||
// NotifyMessageLocalizer is the smallest subset of core.Localizer used in the
|
||||
type NotifyMessageLocalizer interface {
|
||||
SetLocale(string)
|
||||
GetLocalizedMessage(string) string
|
||||
}
|
||||
|
||||
// NotifyFunc will send notification about error to the system with provided credentials.
|
||||
// It will send the notification to system admins.
|
||||
type NotifyFunc func(apiURL, apiKey, msg string)
|
||||
|
||||
// CounterConstructor is used to create counters. This way you can implement your own counter and still use default CounterStorage.
|
||||
type CounterConstructor func() Counter
|
||||
|
||||
// ConnectionDataProvider should return the connection credentials and language by counter ID.
|
||||
// It's best to use account ID as a counter ID to be able to retrieve the necessary data as easy as possible.
|
||||
type ConnectionDataProvider func(id int) (apiURL, apiKey, lang string)
|
69
core/health/processor.go
Normal file
69
core/health/processor.go
Normal file
@ -0,0 +1,69 @@
|
||||
package health
|
||||
|
||||
const (
|
||||
// DefaultMinRequests is a default minimal threshold of total requests. If Counter has less than this amount of requests
|
||||
// total, it will be skipped because it can trigger false alerts otherwise.
|
||||
DefaultMinRequests = 10
|
||||
|
||||
// DefaultFailureThreshold is a default value of successful requests that should be passed in order to suppress any
|
||||
// error notifications. If less than that percentage of requests are successful, the notification will be sent.
|
||||
DefaultFailureThreshold = 0.8
|
||||
)
|
||||
|
||||
// CounterProcessor is a default implementation of Processor. It will try to localize the message in case of error.
|
||||
type CounterProcessor struct {
|
||||
Localizer NotifyMessageLocalizer
|
||||
Notifier NotifyFunc
|
||||
ConnectionDataProvider ConnectionDataProvider
|
||||
Error string
|
||||
FailureThreshold float64
|
||||
MinRequests uint32
|
||||
}
|
||||
|
||||
func (c CounterProcessor) Process(id int, counter Counter) {
|
||||
if counter.IsFailed() {
|
||||
if counter.IsFailureProcessed() {
|
||||
return
|
||||
}
|
||||
|
||||
apiURL, apiKey, lang := c.ConnectionDataProvider(id)
|
||||
c.Notifier(apiURL, apiKey, c.getErrorText(counter.Message(), lang))
|
||||
counter.FailureProcessed()
|
||||
return
|
||||
}
|
||||
|
||||
succeeded := counter.TotalSucceeded()
|
||||
failed := counter.TotalFailed()
|
||||
|
||||
// Ignore this counter for now because total count of requests is less than minimal count.
|
||||
// The results may not be representative.
|
||||
if (succeeded + failed) < c.MinRequests {
|
||||
return
|
||||
}
|
||||
|
||||
// If more than FailureThreshold % of requests are successful, don't do anything.
|
||||
// Default value is 0.8 which would be 80% of successful requests.
|
||||
if (float64(succeeded) / float64(succeeded+failed)) >= c.FailureThreshold {
|
||||
counter.ClearCountersProcessed()
|
||||
counter.FlushCounters()
|
||||
return
|
||||
}
|
||||
|
||||
// Do not process counters values twice if error ocurred.
|
||||
if counter.IsCountersProcessed() {
|
||||
return
|
||||
}
|
||||
|
||||
apiURL, apiKey, lang := c.ConnectionDataProvider(id)
|
||||
c.Notifier(apiURL, apiKey, c.getErrorText(c.Error, lang))
|
||||
counter.CountersProcessed()
|
||||
return
|
||||
}
|
||||
|
||||
func (c CounterProcessor) getErrorText(msg, lang string) string {
|
||||
if c.Localizer == nil {
|
||||
return msg
|
||||
}
|
||||
c.Localizer.SetLocale(lang)
|
||||
return c.Localizer.GetLocalizedMessage(msg)
|
||||
}
|
37
core/health/storage.go
Normal file
37
core/health/storage.go
Normal file
@ -0,0 +1,37 @@
|
||||
package health
|
||||
|
||||
import "sync"
|
||||
|
||||
// SyncMapStorage is a default Storage implementation. It uses sync.Map under the hood because
|
||||
// deletions should be rare for the storage. If your business logic calls Remove often, it would be better
|
||||
// to use your own implementation with map[int]Counter and sync.RWMutex.
|
||||
type SyncMapStorage struct {
|
||||
constructor CounterConstructor
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
// NewSyncMapStorage is a SyncMapStorage constructor.
|
||||
func NewSyncMapStorage(constructor CounterConstructor) Storage {
|
||||
return &SyncMapStorage{constructor: constructor}
|
||||
}
|
||||
|
||||
func (s *SyncMapStorage) Get(id int) Counter {
|
||||
val, found := s.m.Load(id)
|
||||
if found {
|
||||
return val.(Counter)
|
||||
}
|
||||
c := s.constructor()
|
||||
s.m.Store(id, c)
|
||||
return c
|
||||
}
|
||||
|
||||
func (s *SyncMapStorage) Remove(id int) {
|
||||
s.m.Delete(id)
|
||||
}
|
||||
|
||||
func (s *SyncMapStorage) Process(proc Processor) {
|
||||
s.m.Range(func(key, value any) bool {
|
||||
proc.Process(key.(int), value.(Counter))
|
||||
return false
|
||||
})
|
||||
}
|
1
go.mod
1
go.mod
@ -34,6 +34,7 @@ require (
|
||||
github.com/retailcrm/zabbix-metrics-collector v1.0.0
|
||||
github.com/stretchr/testify v1.8.1
|
||||
github.com/ugorji/go v1.2.6 // indirect
|
||||
go.uber.org/atomic v1.10.0
|
||||
golang.org/x/text v0.3.7
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
|
||||
|
2
go.sum
2
go.sum
@ -424,6 +424,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
|
||||
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
|
Loading…
Reference in New Issue
Block a user