Healthcheck support

This commit is contained in:
Pavel 2023-01-10 10:30:44 +03:00 committed by GitHub
commit 594d565d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1108 additions and 50 deletions

View File

@ -19,11 +19,10 @@ jobs:
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Set up Go 1.17
uses: actions/setup-go@v2
- name: Set up stable Go version
uses: actions/setup-go@v3
with:
# TODO: Should migrate to 1.18 later
go-version: '1.17'
go-version: 'stable'
- name: Get dependencies
run: go mod tidy
- name: Lint code with golangci-lint
@ -36,10 +35,10 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ['1.16', '1.17', '1.18']
go-version: ['1.18', '1.19', 'stable']
steps:
- name: Set up Go ${{ matrix.go-version }}
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Check out code into the Go module directory

107
core/healthcheck/counter.go Normal file
View File

@ -0,0 +1,107 @@
package healthcheck
import (
"time"
"go.uber.org/atomic"
)
// DefaultResetPeriod is a default period for AtomicCounter after which internal request counters will be reset.
const DefaultResetPeriod = time.Minute * 15
// AtomicCounter is a default Counter implementation.
// It uses atomics under the hood (hence the name) and can be configured with custom reset timeout and
type AtomicCounter struct {
name atomic.String
msg atomic.String
timestamp atomic.Time
resetPeriod time.Duration
failure atomic.Uint32
failed atomic.Bool
failureProcessed atomic.Bool
countersProcessed atomic.Bool
success atomic.Uint32
}
// NewAtomicCounterWithPeriod returns AtomicCounter configured with provided period.
func NewAtomicCounterWithPeriod(name string, resetPeriod time.Duration) Counter {
c := &AtomicCounter{}
c.SetName(name)
c.resetPeriod = resetPeriod
c.timestamp.Store(time.Now())
return c
}
// NewAtomicCounter returns AtomicCounter with DefaultResetPeriod.
func NewAtomicCounter(name string) Counter {
return NewAtomicCounterWithPeriod(name, DefaultResetPeriod)
}
func (a *AtomicCounter) Name() string {
return a.name.Load()
}
func (a *AtomicCounter) SetName(name string) {
a.name.Store(name)
}
func (a *AtomicCounter) HitSuccess() {
a.success.Add(1)
if a.failed.CompareAndSwap(true, false) {
a.failureProcessed.Store(false)
a.msg.Store("")
}
}
func (a *AtomicCounter) HitFailure() {
a.failure.Add(1)
}
func (a *AtomicCounter) TotalSucceeded() uint32 {
return a.success.Load()
}
func (a *AtomicCounter) TotalFailed() uint32 {
return a.failure.Load()
}
func (a *AtomicCounter) Failed(message string) {
a.msg.Store(message)
a.failed.Store(true)
}
func (a *AtomicCounter) IsFailed() bool {
return a.failed.Load()
}
func (a *AtomicCounter) Message() string {
return a.msg.Load()
}
func (a *AtomicCounter) IsFailureProcessed() bool {
return a.failureProcessed.Load()
}
func (a *AtomicCounter) FailureProcessed() {
a.failureProcessed.Store(true)
}
func (a *AtomicCounter) IsCountersProcessed() bool {
return a.countersProcessed.Load()
}
func (a *AtomicCounter) CountersProcessed() {
a.countersProcessed.Store(true)
}
func (a *AtomicCounter) ClearCountersProcessed() {
a.countersProcessed.Store(false)
}
func (a *AtomicCounter) FlushCounters() {
if time.Now().After(a.timestamp.Load().Add(time.Minute * 15)) {
a.timestamp.Store(time.Now())
a.success.Store(0)
a.failure.Store(0)
}
}

View File

@ -0,0 +1,123 @@
package healthcheck
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/suite"
)
type AtomicCounterTest struct {
suite.Suite
}
func TestAtomicCounter(t *testing.T) {
t.Parallel()
suite.Run(t, new(AtomicCounterTest))
}
func (t *AtomicCounterTest) new() Counter {
return NewAtomicCounter("test")
}
func (t *AtomicCounterTest) Test_Name() {
t.Assert().Equal("test", t.new().Name())
}
func (t *AtomicCounterTest) Test_SetName() {
c := t.new()
c.SetName("new")
t.Assert().Equal("new", c.Name())
}
func (t *AtomicCounterTest) Test_HitSuccess() {
c := t.new()
c.HitSuccess()
t.Assert().Equal(uint32(1), c.TotalSucceeded())
c.Failed("test")
c.FailureProcessed()
c.HitSuccess()
t.Assert().Equal(uint32(2), c.TotalSucceeded())
t.Assert().False(c.IsFailed())
t.Assert().False(c.IsFailureProcessed())
t.Assert().Equal("", c.Message())
}
func (t *AtomicCounterTest) Test_HitFailure() {
c := t.new()
c.HitFailure()
t.Assert().Equal(uint32(1), c.TotalFailed())
c.HitFailure()
t.Assert().Equal(uint32(2), c.TotalFailed())
}
func (t *AtomicCounterTest) Test_Failed() {
c := t.new()
t.Require().False(c.IsFailed())
t.Require().Equal("", c.Message())
c.Failed("message")
t.Assert().True(c.IsFailed())
t.Assert().Equal("message", c.Message())
}
func (t *AtomicCounterTest) Test_CountersProcessed() {
c := t.new()
t.Require().False(c.IsCountersProcessed())
c.CountersProcessed()
t.Assert().True(c.IsCountersProcessed())
c.ClearCountersProcessed()
t.Assert().False(c.IsCountersProcessed())
}
func (t *AtomicCounterTest) Test_FlushCounters() {
c := t.new()
c.HitSuccess()
t.Require().Equal(uint32(1), c.TotalSucceeded())
c.FlushCounters()
t.Assert().Equal(uint32(1), c.TotalSucceeded())
c.(*AtomicCounter).timestamp.Store(time.Now().Add(-(DefaultResetPeriod + time.Second)))
c.FlushCounters()
t.Assert().Equal(uint32(0), c.TotalSucceeded())
}
func (t *AtomicCounterTest) Test_Concurrency() {
c := t.new()
var wg sync.WaitGroup
wg.Add(2)
go func() {
for i := 0; i < 1000; i++ {
c.HitSuccess()
}
wg.Done()
}()
go func() {
for i := 0; i < 500; i++ {
// this delay will ensure that failure is being called after success.
// technically, both have been executed concurrently because first 399 calls will not be delayed.
if i > 399 {
time.Sleep(time.Microsecond)
}
if i > 400 {
c.Failed("total failure")
continue
}
c.HitFailure()
}
c.FailureProcessed()
wg.Done()
}()
wg.Wait()
t.Assert().Equal(uint32(1000), c.TotalSucceeded())
t.Assert().Equal(uint32(401), c.TotalFailed())
t.Assert().True(c.IsFailed())
t.Assert().True(c.IsFailureProcessed())
t.Assert().Equal("total failure", c.Message())
}

87
core/healthcheck/iface.go Normal file
View File

@ -0,0 +1,87 @@
package healthcheck
var (
// compile-time checks to ensure that implementations are compatible with the interface
_ = Storage(&SyncMapStorage{})
_ = Counter(&AtomicCounter{})
_ = Processor(CounterProcessor{})
_ = NotifyFunc(DefaultNotifyFunc)
_ = CounterConstructor(NewAtomicCounter)
)
// Storage stores different instances of Counter. Implementation should be goroutine-safe.
type Storage interface {
// Get counter by its ID. The counter will be instantiated automatically if necessary.
// Name here is not used to identify the counter in the storage.
Get(id int, name string) Counter
// Remove counter if it exists.
Remove(id int)
// Process will iterate over counters and call Processor on each of them.
// This method is used to collect counters data & send notifications.
Process(processor Processor)
}
// Counter will count successful and failed requests. Its contents can be used
// to judge if specific entity (e.g. Connection / Account) is not working properly
// (invalid credentials, too many failed requests, etc) and take further action based on the result.
// Implementation should be goroutine-safe.
type Counter interface {
// Name can be used as a more friendly identifier for the counter.
Name() string
// SetName of the counter.
SetName(name string)
// HitSuccess registers successful request. It should automatically clear error state because that state should be
// used only if error is totally unrecoverable.
HitSuccess()
// HitFailure registers failed request.
HitFailure()
// TotalSucceeded returns how many requests were successful.
TotalSucceeded() uint32
// TotalFailed returns how many requests have failed.
TotalFailed() uint32
// Failed will put Counter into failed state with specific error message.
Failed(message string)
// IsFailed returns true if Counter is in failed state.
IsFailed() bool
// Message will return error message if Counter is in failed state.
Message() string
// IsFailureProcessed will return true if current error inside counter has been processed already.
IsFailureProcessed() bool
// FailureProcessed will mark current error inside Counter as processed.
FailureProcessed()
// IsCountersProcessed returns true if counters value has been processed by the checker.
// This can be used if you want to process counter values only once.
IsCountersProcessed() bool
// CountersProcessed will mark current counters value as processed.
CountersProcessed()
// ClearCountersProcessed will set IsCountersProcessed to false.
ClearCountersProcessed()
// FlushCounters will reset request counters if deemed necessary (for example, AtomicCounter will clear counters
// only if their contents are older than provided time period).
// This won't clear IsCountersProcessed flag!
FlushCounters()
}
// Processor is used to check if Counter is in error state and act accordingly.
type Processor interface {
// Process counter data. This method is not goroutine-safe!
Process(id int, counter Counter) bool
}
// NotifyMessageLocalizer is the smallest subset of core.Localizer used in the
type NotifyMessageLocalizer interface {
SetLocale(locale string)
GetLocalizedTemplateMessage(messageID string, templateData map[string]interface{}) string
}
// NotifyFunc will send notification about error to the system with provided credentials.
// It will send the notification to system admins.
type NotifyFunc func(apiURL, apiKey, msg string) error
// CounterConstructor is used to create counters.
// This way you can implement your own counter and still use default CounterStorage.
type CounterConstructor func(name string) Counter
// ConnectionDataProvider should return the connection credentials and language by counter ID.
// It's best to use account ID as a counter ID to be able to retrieve the necessary data as easy as possible.
type ConnectionDataProvider func(id int) (apiURL, apiKey, lang string, exists bool)

View File

@ -0,0 +1,13 @@
package healthcheck
import retailcrm "github.com/retailcrm/api-client-go/v2"
func DefaultNotifyFunc(apiURL, apiKey, msg string) error {
client := retailcrm.New(apiURL, apiKey)
_, err := client.NotificationsSend(retailcrm.NotificationsSendRequest{
UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins},
Type: retailcrm.NotificationTypeError,
Message: msg,
})
return err
}

View File

@ -0,0 +1,65 @@
package healthcheck
import (
"encoding/json"
"net/http"
"net/url"
"testing"
retailcrm "github.com/retailcrm/api-client-go/v2"
"github.com/retailcrm/mg-transport-core/v2/core/util/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/h2non/gock.v1"
)
func TestDefaultNotifyFunc(t *testing.T) { // nolint:paralleltest
apiURL := "https://test.retailcrm.pro"
apiKey := "key"
msg := "Notification"
data, err := json.Marshal(retailcrm.NotificationsSendRequest{
UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins},
Type: retailcrm.NotificationTypeError,
Message: msg,
})
require.NoError(t, err)
defer gock.Off()
gock.New(apiURL).
Post("/api/v5/notifications/send").
BodyString(url.Values{"notification": {string(data)}}.Encode()).
Reply(http.StatusOK).
JSON(retailcrm.SuccessfulResponse{Success: true})
assert.NoError(t, DefaultNotifyFunc(apiURL, apiKey, msg))
testutil.AssertNoUnmatchedRequests(t)
}
func TestDefaultNotifyFunc_Error(t *testing.T) { // nolint:paralleltest
apiURL := "https://test.retailcrm.pro"
apiKey := "key"
msg := "Notification"
data, err := json.Marshal(retailcrm.NotificationsSendRequest{
UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins},
Type: retailcrm.NotificationTypeError,
Message: msg,
})
require.NoError(t, err)
defer gock.Off()
gock.New(apiURL).
Post("/api/v5/notifications/send").
BodyString(url.Values{"notification": {string(data)}}.Encode()).
Reply(http.StatusForbidden).
JSON(retailcrm.ErrorResponse{
SuccessfulResponse: retailcrm.SuccessfulResponse{Success: false},
ErrorMessage: "Forbidden",
})
err = DefaultNotifyFunc(apiURL, apiKey, msg)
assert.Error(t, err)
assert.Equal(t, "Forbidden", err.Error())
testutil.AssertNoUnmatchedRequests(t)
}

View File

@ -0,0 +1,101 @@
package healthcheck
import (
"github.com/retailcrm/mg-transport-core/v2/core/logger"
)
const (
// DefaultMinRequests is a default minimal threshold of total requests. If Counter has less than
// this amount of requests total, it will be skipped because it can trigger false alerts otherwise.
DefaultMinRequests = 10
// DefaultFailureThreshold is a default value of successful requests that should be passed in order to suppress any
// error notifications. If less than that percentage of requests are successful, the notification will be sent.
DefaultFailureThreshold = 0.8
)
// CounterProcessor is a default implementation of Processor. It will try to localize the message in case of error.
type CounterProcessor struct {
Localizer NotifyMessageLocalizer
Logger logger.Logger
Notifier NotifyFunc
ConnectionDataProvider ConnectionDataProvider
Error string
FailureThreshold float64
MinRequests uint32
Debug bool
}
func (c CounterProcessor) Process(id int, counter Counter) bool { // nolint:varnamelen
if counter.IsFailed() {
if counter.IsFailureProcessed() {
c.debugLog("skipping counter id=%d because its failure is already processed", id)
return true
}
apiURL, apiKey, _, exists := c.ConnectionDataProvider(id)
if !exists {
c.debugLog("cannot find connection data for counter id=%d", id)
return true
}
err := c.Notifier(apiURL, apiKey, counter.Message())
if err != nil {
c.debugLog("cannot send notification for counter id=%d: %s (message: %s)",
id, err, counter.Message())
}
counter.FailureProcessed()
return true
}
succeeded := counter.TotalSucceeded()
failed := counter.TotalFailed()
// Ignore this counter for now because total count of requests is less than minimal count.
// The results may not be representative.
if (succeeded + failed) < c.MinRequests {
c.debugLog("skipping counter id=%d because it has fewer than %d requests", id, c.MinRequests)
return true
}
// If more than FailureThreshold % of requests are successful, don't do anything.
// Default value is 0.8 which would be 80% of successful requests.
if (float64(succeeded) / float64(succeeded+failed)) >= c.FailureThreshold {
counter.ClearCountersProcessed()
counter.FlushCounters()
return true
}
// Do not process counters values twice if error occurred.
if counter.IsCountersProcessed() {
return true
}
apiURL, apiKey, lang, exists := c.ConnectionDataProvider(id)
if !exists {
c.debugLog("cannot find connection data for counter id=%d", id)
return true
}
err := c.Notifier(apiURL, apiKey, c.getErrorText(counter.Name(), c.Error, lang))
if err != nil {
c.debugLog("cannot send notification for counter id=%d: %s (message: %s)",
id, err, counter.Message())
}
counter.CountersProcessed()
return true
}
func (c CounterProcessor) getErrorText(name, msg, lang string) string {
if c.Localizer == nil {
return msg
}
c.Localizer.SetLocale(lang)
return c.Localizer.GetLocalizedTemplateMessage(msg, map[string]interface{}{
"Name": name,
})
}
func (c CounterProcessor) debugLog(msg string, args ...interface{}) {
if c.Debug {
c.Logger.Debugf(msg, args...)
}
}

View File

@ -0,0 +1,364 @@
package healthcheck
import (
"encoding/json"
"errors"
"fmt"
"testing"
"github.com/retailcrm/mg-transport-core/v2/core/util/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)
type CounterProcessorTest struct {
suite.Suite
apiURL string
apiKey string
lang string
}
func TestCounterProcessor(t *testing.T) {
t.Parallel()
suite.Run(t, new(CounterProcessorTest))
}
func (t *CounterProcessorTest) SetupSuite() {
t.apiURL = "https://test.retailcrm.pro"
t.apiKey = "key"
t.lang = "en"
}
func (t *CounterProcessorTest) localizer() NotifyMessageLocalizer {
loc := &localizerMock{}
loc.On("SetLocale", mock.AnythingOfType("string")).Return()
loc.On("GetLocalizedTemplateMessage",
mock.AnythingOfType("string"), mock.Anything).Return(
func(msg string, tpl map[string]interface{}) string {
data, err := json.Marshal(tpl)
if err != nil {
panic(err)
}
return fmt.Sprintf("%s [%s]", msg, string(data))
})
return loc
}
func (t *CounterProcessorTest) new(
nf NotifyFunc, pr ConnectionDataProvider, noLocalizer ...bool) (Processor, testutil.BufferedLogger) {
loc := t.localizer()
if len(noLocalizer) > 0 && noLocalizer[0] {
loc = nil
}
log := testutil.NewBufferedLogger()
return CounterProcessor{
Localizer: loc,
Logger: log,
Notifier: nf,
ConnectionDataProvider: pr,
Error: "default error",
FailureThreshold: DefaultFailureThreshold,
MinRequests: DefaultMinRequests,
Debug: true,
}, log
}
func (t *CounterProcessorTest) notifier(err ...error) *notifierMock {
if len(err) > 0 && err[0] != nil {
return &notifierMock{err: err[0]}
}
return &notifierMock{}
}
func (t *CounterProcessorTest) provider(notFound ...bool) ConnectionDataProvider {
if len(notFound) > 0 && notFound[0] {
return func(id int) (apiURL, apiKey, lang string, exists bool) {
return "", "", "", false
}
}
return func(id int) (apiURL, apiKey, lang string, exists bool) {
return t.apiURL, t.apiKey, t.lang, true
}
}
func (t *CounterProcessorTest) counter() mockedCounter {
return &counterMock{}
}
func (t *CounterProcessorTest) Test_FailureProcessed() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(true)
c.On("IsFailureProcessed").Return(true)
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Contains(log.String(), "skipping counter id=1 because its failure is already processed")
}
func (t *CounterProcessorTest) Test_CounterFailed_CannotFindConnection() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider(true))
c := t.counter()
c.On("IsFailed").Return(true)
c.On("IsFailureProcessed").Return(false)
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Contains(log.String(), "cannot find connection data for counter id=1")
}
func (t *CounterProcessorTest) Test_CounterFailed_ErrWhileNotifying() {
n := t.notifier(errors.New("http status code: 500"))
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(true)
c.On("IsFailureProcessed").Return(false)
c.On("Message").Return("error message")
c.On("FailureProcessed").Return()
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Contains(log.String(), "cannot send notification for counter id=1: http status code: 500 (message: error message)")
t.Assert().Equal(t.apiURL, n.apiURL)
t.Assert().Equal(t.apiKey, n.apiKey)
t.Assert().Equal("error message", n.message)
}
func (t *CounterProcessorTest) Test_CounterFailed_SentNotification() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(true)
c.On("IsFailureProcessed").Return(false)
c.On("Message").Return("error message")
c.On("FailureProcessed").Return()
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Empty(log.String())
t.Assert().Equal(t.apiURL, n.apiURL)
t.Assert().Equal(t.apiKey, n.apiKey)
t.Assert().Equal("error message", n.message)
}
func (t *CounterProcessorTest) Test_TooFewRequests() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(0))
c.On("TotalSucceeded").Return(uint32(DefaultMinRequests - 1))
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Contains(log.String(),
fmt.Sprintf("skipping counter id=%d because it has fewer than %d requests", 1, DefaultMinRequests))
}
func (t *CounterProcessorTest) Test_ThresholdNotPassed() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(20))
c.On("TotalSucceeded").Return(uint32(80))
c.On("ClearCountersProcessed").Return()
c.On("FlushCounters").Return()
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Empty(log.String())
t.Assert().Empty(n.message)
}
func (t *CounterProcessorTest) Test_ThresholdPassed_AlreadyProcessed() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(21))
c.On("TotalSucceeded").Return(uint32(79))
c.On("IsCountersProcessed").Return(true)
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Empty(log.String())
t.Assert().Empty(n.message)
}
func (t *CounterProcessorTest) Test_ThresholdPassed_NoConnectionFound() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider(true))
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(21))
c.On("TotalSucceeded").Return(uint32(79))
c.On("IsCountersProcessed").Return(false)
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Contains(log.String(), "cannot find connection data for counter id=1")
t.Assert().Empty(n.message)
}
func (t *CounterProcessorTest) Test_ThresholdPassed_NotifyingError() {
n := t.notifier(errors.New("unknown error"))
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(21))
c.On("TotalSucceeded").Return(uint32(79))
c.On("IsCountersProcessed").Return(false)
c.On("Name").Return("MockedCounter")
c.On("Message").Return("")
c.On("CountersProcessed").Return()
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Contains(log.String(), "cannot send notification for counter id=1: unknown error (message: )")
t.Assert().Equal(`default error [{"Name":"MockedCounter"}]`, n.message)
}
func (t *CounterProcessorTest) Test_ThresholdPassed_NotificationSent() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider())
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(21))
c.On("TotalSucceeded").Return(uint32(79))
c.On("IsCountersProcessed").Return(false)
c.On("Name").Return("MockedCounter")
c.On("CountersProcessed").Return()
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Empty(log.String())
t.Assert().Equal(`default error [{"Name":"MockedCounter"}]`, n.message)
}
func (t *CounterProcessorTest) Test_ThresholdPassed_NotificationSent_NoLocalizer() {
n := t.notifier()
p, log := t.new(n.Notify, t.provider(), true)
c := t.counter()
c.On("IsFailed").Return(false)
c.On("TotalFailed").Return(uint32(21))
c.On("TotalSucceeded").Return(uint32(79))
c.On("IsCountersProcessed").Return(false)
c.On("Name").Return("MockedCounter")
c.On("CountersProcessed").Return()
p.Process(1, c)
c.AssertExpectations(t.T())
t.Assert().Empty(log.String())
t.Assert().Equal(`default error`, n.message)
}
type localizerMock struct {
mock.Mock
}
func (l *localizerMock) SetLocale(lang string) {
l.Called(lang)
}
func (l *localizerMock) GetLocalizedTemplateMessage(messageID string, templateData map[string]interface{}) string {
args := l.Called(messageID, templateData)
if fn, ok := args.Get(0).(func(string, map[string]interface{}) string); ok {
return fn(messageID, templateData)
}
return args.String(0)
}
type mockedCounter interface {
Counter
On(methodName string, arguments ...interface{}) *mock.Call
AssertExpectations(t mock.TestingT) bool
}
type counterMock struct {
mock.Mock
}
func (cm *counterMock) Name() string {
args := cm.Called()
return args.String(0)
}
func (cm *counterMock) SetName(name string) {
cm.Called(name)
}
func (cm *counterMock) HitSuccess() {
cm.Called()
}
func (cm *counterMock) HitFailure() {
cm.Called()
}
func (cm *counterMock) TotalSucceeded() uint32 {
args := cm.Called()
return args.Get(0).(uint32)
}
func (cm *counterMock) TotalFailed() uint32 {
args := cm.Called()
return args.Get(0).(uint32)
}
func (cm *counterMock) Message() string {
args := cm.Called()
return args.String(0)
}
func (cm *counterMock) IsFailed() bool {
args := cm.Called()
return args.Bool(0)
}
func (cm *counterMock) Failed(message string) {
cm.Called(message)
}
func (cm *counterMock) IsFailureProcessed() bool {
args := cm.Called()
return args.Bool(0)
}
func (cm *counterMock) IsCountersProcessed() bool {
args := cm.Called()
return args.Bool(0)
}
func (cm *counterMock) FailureProcessed() {
cm.Called()
}
func (cm *counterMock) CountersProcessed() {
cm.Called()
}
func (cm *counterMock) ClearCountersProcessed() {
cm.Called()
}
func (cm *counterMock) FlushCounters() {
cm.Called()
}
type notifierMock struct {
err error
apiURL string
apiKey string
message string
}
func (n *notifierMock) Notify(apiURL, apiKey, msg string) error {
n.apiURL = apiURL
n.apiKey = apiKey
n.message = msg
return n.err
}

View File

@ -0,0 +1,38 @@
package healthcheck
import "sync"
// SyncMapStorage is a default Storage implementation. It uses sync.Map under the hood because
// deletions should be rare for the storage. If your business logic calls Remove often, it would be better
// to use your own implementation with map[int]Counter and sync.RWMutex.
type SyncMapStorage struct {
constructor CounterConstructor
m sync.Map
}
// NewSyncMapStorage is a SyncMapStorage constructor.
func NewSyncMapStorage(constructor CounterConstructor) Storage {
return &SyncMapStorage{constructor: constructor}
}
func (s *SyncMapStorage) Get(id int, name string) Counter {
val, found := s.m.Load(id)
if found {
counter := val.(Counter)
counter.SetName(name)
return val.(Counter)
}
c := s.constructor(name)
s.m.Store(id, c)
return c
}
func (s *SyncMapStorage) Remove(id int) {
s.m.Delete(id)
}
func (s *SyncMapStorage) Process(proc Processor) {
s.m.Range(func(key, value any) bool {
return proc.Process(key.(int), value.(Counter))
})
}

View File

@ -0,0 +1,69 @@
package healthcheck
import (
"sync"
"testing"
"github.com/stretchr/testify/suite"
)
type SyncMapStorageTest struct {
suite.Suite
storage Storage
}
func TestSyncMapStorage(t *testing.T) {
t.Parallel()
suite.Run(t, new(SyncMapStorageTest))
}
func (t *SyncMapStorageTest) SetupSuite() {
t.storage = NewSyncMapStorage(NewAtomicCounter)
}
func (t *SyncMapStorageTest) Test_Get() {
counter := t.storage.Get(1, "Name")
t.Assert().NotNil(counter)
t.Assert().IsType(&AtomicCounter{}, counter)
t.Assert().Equal("Name", counter.Name())
newCounter := t.storage.Get(1, "New Name")
t.Assert().Equal(counter, newCounter)
t.Assert().Equal("New Name", newCounter.Name())
}
func (t *SyncMapStorageTest) Test_Process() {
var wg sync.WaitGroup
wg.Add(1)
t.storage.Process(storageCallbackProcessor{callback: func(id int, counter Counter) bool {
t.Assert().Equal(1, id)
t.Assert().Equal("New Name", counter.Name())
wg.Done()
return false
}})
wg.Wait()
}
func (t *SyncMapStorageTest) Test_Remove() {
defer func() {
if r := recover(); r != nil {
t.Fail("unexpected panic:", r)
}
}()
t.storage.Remove(0)
t.storage.Remove(-1)
t.storage.Remove(1)
t.storage.Process(storageCallbackProcessor{callback: func(id int, counter Counter) bool {
t.Fail("did not expect any items:", id, counter)
return false
}})
}
type storageCallbackProcessor struct {
callback func(id int, counter Counter) bool
}
func (p storageCallbackProcessor) Process(id int, counter Counter) bool {
return p.callback(id, counter)
}

View File

@ -12,6 +12,15 @@ import (
// JobFunc is empty func which should be executed in a parallel goroutine.
type JobFunc func(logger.Logger) error
// JobAfterCallback will be called after specific job is done.
// This can be used to run something after asynchronous job is done. The function will
// receive an error from the job which can be used to alter the callback behavior. If callback
// returns an error, it will be processed using job ErrorHandler.
// The callback won't be executed in case of panic. Panicked callback will be show in logs
// as if the job itself panicked, ergo, do not write jobs or callbacks that can panic,
// or process the panic by yourself.
type JobAfterCallback func(jobError error, log logger.Logger) error
// JobErrorHandler is a function to handle jobs errors. First argument is a job name.
type JobErrorHandler func(string, error, logger.Logger)
@ -32,21 +41,22 @@ type Job struct {
// JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as
// singleton), or jobs can be executed as regular jobs. Example initialization:
// manager := NewJobManager().
// SetLogger(logger).
// SetLogging(false)
// _ = manager.RegisterJob("updateTokens", &Job{
// Command: func(log logger.Logger) error {
// // logic goes here...
// logger.Info("All tokens were updated successfully")
// return nil
// },
// ErrorHandler: DefaultJobErrorHandler(),
// PanicHandler: DefaultJobPanicHandler(),
// Interval: time.Hour * 3,
// Regular: true,
// })
// manager.Start()
//
// manager := NewJobManager().
// SetLogger(logger).
// SetLogging(false)
// _ = manager.RegisterJob("updateTokens", &Job{
// Command: func(log logger.Logger) error {
// // logic goes here...
// logger.Info("All tokens were updated successfully")
// return nil
// },
// ErrorHandler: DefaultJobErrorHandler(),
// PanicHandler: DefaultJobPanicHandler(),
// Interval: time.Hour * 3,
// Regular: true,
// })
// manager.Start()
type JobManager struct {
logger logger.Logger
nilLogger logger.Logger
@ -55,17 +65,24 @@ type JobManager struct {
}
// getWrappedFunc wraps job into function.
func (j *Job) getWrappedFunc(name string, log logger.Logger) func() {
return func() {
func (j *Job) getWrappedFunc(name string, log logger.Logger) func(callback JobAfterCallback) {
return func(callback JobAfterCallback) {
defer func() {
if r := recover(); r != nil && j.PanicHandler != nil {
j.PanicHandler(name, r, log)
}
}()
if err := j.Command(log); err != nil && j.ErrorHandler != nil {
err := j.Command(log)
if err != nil && j.ErrorHandler != nil {
j.ErrorHandler(name, err, log)
}
if callback != nil {
err := callback(err, log)
if j.ErrorHandler != nil {
j.ErrorHandler(name, err, log)
}
}
}
}
@ -77,7 +94,7 @@ func (j *Job) getWrappedTimerFunc(name string, log logger.Logger) func(chan bool
case <-stopChannel:
return
default:
j.getWrappedFunc(name, log)()
j.getWrappedFunc(name, log)(nil)
}
}
}
@ -118,13 +135,13 @@ func (j *Job) stop() {
}
// runOnce run job once.
func (j *Job) runOnce(name string, log logger.Logger) {
go j.getWrappedFunc(name, log)()
func (j *Job) runOnce(name string, log logger.Logger, callback JobAfterCallback) {
go j.getWrappedFunc(name, log)(callback)
}
// runOnceSync run job once in current goroutine.
func (j *Job) runOnceSync(name string, log logger.Logger) {
j.getWrappedFunc(name, log)()
j.getWrappedFunc(name, log)(nil)
}
// NewJobManager is a JobManager constructor.
@ -241,15 +258,52 @@ func (j *JobManager) StopJob(name string) error {
}
// RunJobOnce starts provided job once if it exists. It's also async.
func (j *JobManager) RunJobOnce(name string) error {
func (j *JobManager) RunJobOnce(name string, callback ...JobAfterCallback) error {
if job, ok := j.FetchJob(name); ok {
job.runOnce(name, j.Logger())
var cb JobAfterCallback
if len(callback) > 0 {
cb = callback[0]
}
job.runOnce(name, j.Logger(), cb)
return nil
}
return fmt.Errorf("cannot find job `%s`", name)
}
// RunJobsOnceSequentially will execute provided jobs asynchronously. It uses JobAfterCallback under the hood.
// You can prevent subsequent jobs from running using stopOnError flag.
func (j *JobManager) RunJobsOnceSequentially(names []string, stopOnError bool) error {
if len(names) == 0 {
return nil
}
var chained JobAfterCallback
for i := len(names) - 1; i > 0; i-- {
i := i
if chained == nil {
chained = func(jobError error, log logger.Logger) error {
if jobError != nil && stopOnError {
return jobError
}
return j.RunJobOnce(names[i])
}
continue
}
oldCallback := chained
chained = func(jobError error, log logger.Logger) error {
if jobError != nil && stopOnError {
return jobError
}
err := j.RunJobOnce(names[i], oldCallback)
return err
}
}
return j.RunJobOnce(names[0], chained)
}
// RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work.
func (j *JobManager) RunJobOnceSync(name string) error {
if job, ok := j.FetchJob(name); ok {

View File

@ -293,7 +293,7 @@ func (t *JobTest) Test_getWrappedFunc() {
t.onceJob()
fn := t.job.getWrappedFunc("job", t.testLogger())
require.NotNil(t.T(), fn)
go fn()
go fn(nil)
assert.True(t.T(), t.executed())
assert.False(t.T(), t.errored(time.Millisecond))
assert.False(t.T(), t.panicked(time.Millisecond))
@ -308,7 +308,7 @@ func (t *JobTest) Test_getWrappedFuncError() {
t.onceErrorJob()
fn := t.job.getWrappedFunc("job", t.testLogger())
require.NotNil(t.T(), fn)
go fn()
go fn(nil)
assert.True(t.T(), t.executed())
assert.True(t.T(), t.errored(time.Millisecond))
assert.False(t.T(), t.panicked(time.Millisecond))
@ -323,7 +323,7 @@ func (t *JobTest) Test_getWrappedFuncPanic() {
t.oncePanicJob()
fn := t.job.getWrappedFunc("job", t.testLogger())
require.NotNil(t.T(), fn)
go fn()
go fn(nil)
assert.True(t.T(), t.executed())
assert.False(t.T(), t.errored(time.Millisecond))
assert.True(t.T(), t.panicked(time.Millisecond))
@ -347,7 +347,7 @@ func (t *JobTest) Test_runOnce() {
}()
t.regularJob()
t.job.runOnce("job", t.testLogger())
t.job.runOnce("job", t.testLogger(), nil)
time.Sleep(time.Millisecond * 5)
require.True(t.T(), t.executed())
first := 0

62
go.mod
View File

@ -1,43 +1,79 @@
module github.com/retailcrm/mg-transport-core/v2
go 1.12
go 1.18
require (
cloud.google.com/go v0.60.0 // indirect
github.com/DATA-DOG/go-sqlmock v1.3.3
github.com/aws/aws-sdk-go v1.36.30
github.com/aws/aws-sdk-go-v2 v1.17.2
github.com/aws/aws-sdk-go-v2/config v1.18.4
github.com/aws/aws-sdk-go-v2/service/s3 v1.29.5
github.com/blacked/go-zabbix v0.0.0-20170118040903-3c6a95ec4fdc
github.com/denisenkom/go-mssqldb v0.0.0-20190830225923-3302f0226fbd // indirect
github.com/getsentry/sentry-go v0.12.0
github.com/gin-contrib/multitemplate v0.0.0-20190914010127-bba2ccfe37ec
github.com/gin-gonic/gin v1.7.2
github.com/go-playground/validator/v10 v10.8.0
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gomarkdown/markdown v0.0.0-20221013030248-663e2500819c
github.com/gorilla/securecookie v1.1.1
github.com/gorilla/sessions v1.2.0
github.com/jessevdk/go-flags v1.4.0
github.com/jinzhu/gorm v1.9.11
github.com/json-iterator/go v1.1.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.9.0 // indirect
github.com/nicksnyder/go-i18n/v2 v2.0.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/pkg/errors v0.9.1
github.com/retailcrm/api-client-go/v2 v2.0.12
github.com/retailcrm/api-client-go/v2 v2.1.3
github.com/retailcrm/mg-transport-api-client-go v1.1.32
github.com/retailcrm/zabbix-metrics-collector v1.0.0
github.com/stretchr/testify v1.8.1
github.com/ugorji/go v1.2.6 // indirect
go.uber.org/atomic v1.10.0
golang.org/x/text v0.3.7
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/gormigrate.v1 v1.6.0
gopkg.in/h2non/gock.v1 v1.1.2
gopkg.in/yaml.v2 v2.4.0
)
require (
cloud.google.com/go v0.60.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20190830225923-3302f0226fbd // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lib/pq v1.9.0 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/ugorji/go/codec v1.2.6 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

8
go.sum
View File

@ -361,8 +361,8 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailcrm/api-client-go/v2 v2.0.12 h1:0pMDsz4fzKpizUxoF1qtgwN+oJ+YJruncCj6OTOB3e8=
github.com/retailcrm/api-client-go/v2 v2.0.12/go.mod h1:1yTZl9+gd3+/k0kAJe7sYvC+mL4fqMwIwtnSgSWZlkQ=
github.com/retailcrm/api-client-go/v2 v2.1.3 h1:AVcp9oeSOm6+3EWXCgdQs+XE3PTjzCKKB//MUAe0Zb0=
github.com/retailcrm/api-client-go/v2 v2.1.3/go.mod h1:1yTZl9+gd3+/k0kAJe7sYvC+mL4fqMwIwtnSgSWZlkQ=
github.com/retailcrm/mg-transport-api-client-go v1.1.32 h1:IBPltSoD5q2PPZJbNC/prK5F9rEVPXVx/ZzDpi7HKhs=
github.com/retailcrm/mg-transport-api-client-go v1.1.32/go.mod h1:AWV6BueE28/6SCoyfKURTo4lF0oXYoOKmHTzehd5vAI=
github.com/retailcrm/zabbix-metrics-collector v1.0.0 h1:ju3rhpgVoiKII6oXEJEf2eoJy5bNcYAmOPRp1oPWDmA=
@ -385,6 +385,7 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -397,7 +398,6 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.2.6 h1:tGiWC9HENWE2tqYycIqFTNorMmFRVhNwCpDOpWqnk8E=
github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
@ -424,6 +424,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=