diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e0aa631..62dcf07 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,11 +19,10 @@ jobs: steps: - name: Check out code into the Go module directory uses: actions/checkout@v2 - - name: Set up Go 1.17 - uses: actions/setup-go@v2 + - name: Set up stable Go version + uses: actions/setup-go@v3 with: - # TODO: Should migrate to 1.18 later - go-version: '1.17' + go-version: 'stable' - name: Get dependencies run: go mod tidy - name: Lint code with golangci-lint @@ -36,10 +35,10 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: ['1.16', '1.17', '1.18'] + go-version: ['1.18', '1.19', 'stable'] steps: - name: Set up Go ${{ matrix.go-version }} - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: ${{ matrix.go-version }} - name: Check out code into the Go module directory diff --git a/core/healthcheck/counter.go b/core/healthcheck/counter.go new file mode 100644 index 0000000..3a8c2dd --- /dev/null +++ b/core/healthcheck/counter.go @@ -0,0 +1,107 @@ +package healthcheck + +import ( + "time" + + "go.uber.org/atomic" +) + +// DefaultResetPeriod is a default period for AtomicCounter after which internal request counters will be reset. +const DefaultResetPeriod = time.Minute * 15 + +// AtomicCounter is a default Counter implementation. +// It uses atomics under the hood (hence the name) and can be configured with custom reset timeout and +type AtomicCounter struct { + name atomic.String + msg atomic.String + timestamp atomic.Time + resetPeriod time.Duration + failure atomic.Uint32 + failed atomic.Bool + failureProcessed atomic.Bool + countersProcessed atomic.Bool + success atomic.Uint32 +} + +// NewAtomicCounterWithPeriod returns AtomicCounter configured with provided period. +func NewAtomicCounterWithPeriod(name string, resetPeriod time.Duration) Counter { + c := &AtomicCounter{} + c.SetName(name) + c.resetPeriod = resetPeriod + c.timestamp.Store(time.Now()) + return c +} + +// NewAtomicCounter returns AtomicCounter with DefaultResetPeriod. +func NewAtomicCounter(name string) Counter { + return NewAtomicCounterWithPeriod(name, DefaultResetPeriod) +} + +func (a *AtomicCounter) Name() string { + return a.name.Load() +} + +func (a *AtomicCounter) SetName(name string) { + a.name.Store(name) +} + +func (a *AtomicCounter) HitSuccess() { + a.success.Add(1) + if a.failed.CompareAndSwap(true, false) { + a.failureProcessed.Store(false) + a.msg.Store("") + } +} + +func (a *AtomicCounter) HitFailure() { + a.failure.Add(1) +} + +func (a *AtomicCounter) TotalSucceeded() uint32 { + return a.success.Load() +} + +func (a *AtomicCounter) TotalFailed() uint32 { + return a.failure.Load() +} + +func (a *AtomicCounter) Failed(message string) { + a.msg.Store(message) + a.failed.Store(true) +} + +func (a *AtomicCounter) IsFailed() bool { + return a.failed.Load() +} + +func (a *AtomicCounter) Message() string { + return a.msg.Load() +} + +func (a *AtomicCounter) IsFailureProcessed() bool { + return a.failureProcessed.Load() +} + +func (a *AtomicCounter) FailureProcessed() { + a.failureProcessed.Store(true) +} + +func (a *AtomicCounter) IsCountersProcessed() bool { + return a.countersProcessed.Load() +} + +func (a *AtomicCounter) CountersProcessed() { + a.countersProcessed.Store(true) +} + +func (a *AtomicCounter) ClearCountersProcessed() { + a.countersProcessed.Store(false) +} + +func (a *AtomicCounter) FlushCounters() { + if time.Now().After(a.timestamp.Load().Add(time.Minute * 15)) { + a.timestamp.Store(time.Now()) + a.success.Store(0) + a.failure.Store(0) + } +} diff --git a/core/healthcheck/counter_test.go b/core/healthcheck/counter_test.go new file mode 100644 index 0000000..982bb96 --- /dev/null +++ b/core/healthcheck/counter_test.go @@ -0,0 +1,123 @@ +package healthcheck + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type AtomicCounterTest struct { + suite.Suite +} + +func TestAtomicCounter(t *testing.T) { + t.Parallel() + suite.Run(t, new(AtomicCounterTest)) +} + +func (t *AtomicCounterTest) new() Counter { + return NewAtomicCounter("test") +} + +func (t *AtomicCounterTest) Test_Name() { + t.Assert().Equal("test", t.new().Name()) +} + +func (t *AtomicCounterTest) Test_SetName() { + c := t.new() + c.SetName("new") + t.Assert().Equal("new", c.Name()) +} + +func (t *AtomicCounterTest) Test_HitSuccess() { + c := t.new() + c.HitSuccess() + t.Assert().Equal(uint32(1), c.TotalSucceeded()) + + c.Failed("test") + c.FailureProcessed() + c.HitSuccess() + t.Assert().Equal(uint32(2), c.TotalSucceeded()) + t.Assert().False(c.IsFailed()) + t.Assert().False(c.IsFailureProcessed()) + t.Assert().Equal("", c.Message()) +} + +func (t *AtomicCounterTest) Test_HitFailure() { + c := t.new() + c.HitFailure() + t.Assert().Equal(uint32(1), c.TotalFailed()) + c.HitFailure() + t.Assert().Equal(uint32(2), c.TotalFailed()) +} + +func (t *AtomicCounterTest) Test_Failed() { + c := t.new() + t.Require().False(c.IsFailed()) + t.Require().Equal("", c.Message()) + + c.Failed("message") + t.Assert().True(c.IsFailed()) + t.Assert().Equal("message", c.Message()) +} + +func (t *AtomicCounterTest) Test_CountersProcessed() { + c := t.new() + t.Require().False(c.IsCountersProcessed()) + + c.CountersProcessed() + t.Assert().True(c.IsCountersProcessed()) + + c.ClearCountersProcessed() + t.Assert().False(c.IsCountersProcessed()) +} + +func (t *AtomicCounterTest) Test_FlushCounters() { + c := t.new() + c.HitSuccess() + t.Require().Equal(uint32(1), c.TotalSucceeded()) + + c.FlushCounters() + t.Assert().Equal(uint32(1), c.TotalSucceeded()) + + c.(*AtomicCounter).timestamp.Store(time.Now().Add(-(DefaultResetPeriod + time.Second))) + c.FlushCounters() + t.Assert().Equal(uint32(0), c.TotalSucceeded()) +} + +func (t *AtomicCounterTest) Test_Concurrency() { + c := t.new() + var wg sync.WaitGroup + wg.Add(2) + go func() { + for i := 0; i < 1000; i++ { + c.HitSuccess() + } + wg.Done() + }() + go func() { + for i := 0; i < 500; i++ { + // this delay will ensure that failure is being called after success. + // technically, both have been executed concurrently because first 399 calls will not be delayed. + if i > 399 { + time.Sleep(time.Microsecond) + } + if i > 400 { + c.Failed("total failure") + continue + } + c.HitFailure() + } + c.FailureProcessed() + wg.Done() + }() + wg.Wait() + + t.Assert().Equal(uint32(1000), c.TotalSucceeded()) + t.Assert().Equal(uint32(401), c.TotalFailed()) + t.Assert().True(c.IsFailed()) + t.Assert().True(c.IsFailureProcessed()) + t.Assert().Equal("total failure", c.Message()) +} diff --git a/core/healthcheck/iface.go b/core/healthcheck/iface.go new file mode 100644 index 0000000..d5cd195 --- /dev/null +++ b/core/healthcheck/iface.go @@ -0,0 +1,87 @@ +package healthcheck + +var ( + // compile-time checks to ensure that implementations are compatible with the interface + _ = Storage(&SyncMapStorage{}) + _ = Counter(&AtomicCounter{}) + _ = Processor(CounterProcessor{}) + _ = NotifyFunc(DefaultNotifyFunc) + _ = CounterConstructor(NewAtomicCounter) +) + +// Storage stores different instances of Counter. Implementation should be goroutine-safe. +type Storage interface { + // Get counter by its ID. The counter will be instantiated automatically if necessary. + // Name here is not used to identify the counter in the storage. + Get(id int, name string) Counter + // Remove counter if it exists. + Remove(id int) + // Process will iterate over counters and call Processor on each of them. + // This method is used to collect counters data & send notifications. + Process(processor Processor) +} + +// Counter will count successful and failed requests. Its contents can be used +// to judge if specific entity (e.g. Connection / Account) is not working properly +// (invalid credentials, too many failed requests, etc) and take further action based on the result. +// Implementation should be goroutine-safe. +type Counter interface { + // Name can be used as a more friendly identifier for the counter. + Name() string + // SetName of the counter. + SetName(name string) + // HitSuccess registers successful request. It should automatically clear error state because that state should be + // used only if error is totally unrecoverable. + HitSuccess() + // HitFailure registers failed request. + HitFailure() + // TotalSucceeded returns how many requests were successful. + TotalSucceeded() uint32 + // TotalFailed returns how many requests have failed. + TotalFailed() uint32 + // Failed will put Counter into failed state with specific error message. + Failed(message string) + // IsFailed returns true if Counter is in failed state. + IsFailed() bool + // Message will return error message if Counter is in failed state. + Message() string + // IsFailureProcessed will return true if current error inside counter has been processed already. + IsFailureProcessed() bool + // FailureProcessed will mark current error inside Counter as processed. + FailureProcessed() + // IsCountersProcessed returns true if counters value has been processed by the checker. + // This can be used if you want to process counter values only once. + IsCountersProcessed() bool + // CountersProcessed will mark current counters value as processed. + CountersProcessed() + // ClearCountersProcessed will set IsCountersProcessed to false. + ClearCountersProcessed() + // FlushCounters will reset request counters if deemed necessary (for example, AtomicCounter will clear counters + // only if their contents are older than provided time period). + // This won't clear IsCountersProcessed flag! + FlushCounters() +} + +// Processor is used to check if Counter is in error state and act accordingly. +type Processor interface { + // Process counter data. This method is not goroutine-safe! + Process(id int, counter Counter) bool +} + +// NotifyMessageLocalizer is the smallest subset of core.Localizer used in the +type NotifyMessageLocalizer interface { + SetLocale(locale string) + GetLocalizedTemplateMessage(messageID string, templateData map[string]interface{}) string +} + +// NotifyFunc will send notification about error to the system with provided credentials. +// It will send the notification to system admins. +type NotifyFunc func(apiURL, apiKey, msg string) error + +// CounterConstructor is used to create counters. +// This way you can implement your own counter and still use default CounterStorage. +type CounterConstructor func(name string) Counter + +// ConnectionDataProvider should return the connection credentials and language by counter ID. +// It's best to use account ID as a counter ID to be able to retrieve the necessary data as easy as possible. +type ConnectionDataProvider func(id int) (apiURL, apiKey, lang string, exists bool) diff --git a/core/healthcheck/notifier.go b/core/healthcheck/notifier.go new file mode 100644 index 0000000..8fe3859 --- /dev/null +++ b/core/healthcheck/notifier.go @@ -0,0 +1,13 @@ +package healthcheck + +import retailcrm "github.com/retailcrm/api-client-go/v2" + +func DefaultNotifyFunc(apiURL, apiKey, msg string) error { + client := retailcrm.New(apiURL, apiKey) + _, err := client.NotificationsSend(retailcrm.NotificationsSendRequest{ + UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins}, + Type: retailcrm.NotificationTypeError, + Message: msg, + }) + return err +} diff --git a/core/healthcheck/notifier_test.go b/core/healthcheck/notifier_test.go new file mode 100644 index 0000000..eb7b415 --- /dev/null +++ b/core/healthcheck/notifier_test.go @@ -0,0 +1,65 @@ +package healthcheck + +import ( + "encoding/json" + "net/http" + "net/url" + "testing" + + retailcrm "github.com/retailcrm/api-client-go/v2" + "github.com/retailcrm/mg-transport-core/v2/core/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/h2non/gock.v1" +) + +func TestDefaultNotifyFunc(t *testing.T) { // nolint:paralleltest + apiURL := "https://test.retailcrm.pro" + apiKey := "key" + msg := "Notification" + + data, err := json.Marshal(retailcrm.NotificationsSendRequest{ + UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins}, + Type: retailcrm.NotificationTypeError, + Message: msg, + }) + require.NoError(t, err) + + defer gock.Off() + gock.New(apiURL). + Post("/api/v5/notifications/send"). + BodyString(url.Values{"notification": {string(data)}}.Encode()). + Reply(http.StatusOK). + JSON(retailcrm.SuccessfulResponse{Success: true}) + + assert.NoError(t, DefaultNotifyFunc(apiURL, apiKey, msg)) + testutil.AssertNoUnmatchedRequests(t) +} + +func TestDefaultNotifyFunc_Error(t *testing.T) { // nolint:paralleltest + apiURL := "https://test.retailcrm.pro" + apiKey := "key" + msg := "Notification" + + data, err := json.Marshal(retailcrm.NotificationsSendRequest{ + UserGroups: []retailcrm.UserGroupType{retailcrm.UserGroupSuperadmins}, + Type: retailcrm.NotificationTypeError, + Message: msg, + }) + require.NoError(t, err) + + defer gock.Off() + gock.New(apiURL). + Post("/api/v5/notifications/send"). + BodyString(url.Values{"notification": {string(data)}}.Encode()). + Reply(http.StatusForbidden). + JSON(retailcrm.ErrorResponse{ + SuccessfulResponse: retailcrm.SuccessfulResponse{Success: false}, + ErrorMessage: "Forbidden", + }) + + err = DefaultNotifyFunc(apiURL, apiKey, msg) + assert.Error(t, err) + assert.Equal(t, "Forbidden", err.Error()) + testutil.AssertNoUnmatchedRequests(t) +} diff --git a/core/healthcheck/processor.go b/core/healthcheck/processor.go new file mode 100644 index 0000000..fc11ccd --- /dev/null +++ b/core/healthcheck/processor.go @@ -0,0 +1,101 @@ +package healthcheck + +import ( + "github.com/retailcrm/mg-transport-core/v2/core/logger" +) + +const ( + // DefaultMinRequests is a default minimal threshold of total requests. If Counter has less than + // this amount of requests total, it will be skipped because it can trigger false alerts otherwise. + DefaultMinRequests = 10 + + // DefaultFailureThreshold is a default value of successful requests that should be passed in order to suppress any + // error notifications. If less than that percentage of requests are successful, the notification will be sent. + DefaultFailureThreshold = 0.8 +) + +// CounterProcessor is a default implementation of Processor. It will try to localize the message in case of error. +type CounterProcessor struct { + Localizer NotifyMessageLocalizer + Logger logger.Logger + Notifier NotifyFunc + ConnectionDataProvider ConnectionDataProvider + Error string + FailureThreshold float64 + MinRequests uint32 + Debug bool +} + +func (c CounterProcessor) Process(id int, counter Counter) bool { // nolint:varnamelen + if counter.IsFailed() { + if counter.IsFailureProcessed() { + c.debugLog("skipping counter id=%d because its failure is already processed", id) + return true + } + + apiURL, apiKey, _, exists := c.ConnectionDataProvider(id) + if !exists { + c.debugLog("cannot find connection data for counter id=%d", id) + return true + } + err := c.Notifier(apiURL, apiKey, counter.Message()) + if err != nil { + c.debugLog("cannot send notification for counter id=%d: %s (message: %s)", + id, err, counter.Message()) + } + counter.FailureProcessed() + return true + } + + succeeded := counter.TotalSucceeded() + failed := counter.TotalFailed() + + // Ignore this counter for now because total count of requests is less than minimal count. + // The results may not be representative. + if (succeeded + failed) < c.MinRequests { + c.debugLog("skipping counter id=%d because it has fewer than %d requests", id, c.MinRequests) + return true + } + + // If more than FailureThreshold % of requests are successful, don't do anything. + // Default value is 0.8 which would be 80% of successful requests. + if (float64(succeeded) / float64(succeeded+failed)) >= c.FailureThreshold { + counter.ClearCountersProcessed() + counter.FlushCounters() + return true + } + + // Do not process counters values twice if error occurred. + if counter.IsCountersProcessed() { + return true + } + + apiURL, apiKey, lang, exists := c.ConnectionDataProvider(id) + if !exists { + c.debugLog("cannot find connection data for counter id=%d", id) + return true + } + err := c.Notifier(apiURL, apiKey, c.getErrorText(counter.Name(), c.Error, lang)) + if err != nil { + c.debugLog("cannot send notification for counter id=%d: %s (message: %s)", + id, err, counter.Message()) + } + counter.CountersProcessed() + return true +} + +func (c CounterProcessor) getErrorText(name, msg, lang string) string { + if c.Localizer == nil { + return msg + } + c.Localizer.SetLocale(lang) + return c.Localizer.GetLocalizedTemplateMessage(msg, map[string]interface{}{ + "Name": name, + }) +} + +func (c CounterProcessor) debugLog(msg string, args ...interface{}) { + if c.Debug { + c.Logger.Debugf(msg, args...) + } +} diff --git a/core/healthcheck/processor_test.go b/core/healthcheck/processor_test.go new file mode 100644 index 0000000..9774310 --- /dev/null +++ b/core/healthcheck/processor_test.go @@ -0,0 +1,364 @@ +package healthcheck + +import ( + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/retailcrm/mg-transport-core/v2/core/util/testutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type CounterProcessorTest struct { + suite.Suite + apiURL string + apiKey string + lang string +} + +func TestCounterProcessor(t *testing.T) { + t.Parallel() + suite.Run(t, new(CounterProcessorTest)) +} + +func (t *CounterProcessorTest) SetupSuite() { + t.apiURL = "https://test.retailcrm.pro" + t.apiKey = "key" + t.lang = "en" +} + +func (t *CounterProcessorTest) localizer() NotifyMessageLocalizer { + loc := &localizerMock{} + loc.On("SetLocale", mock.AnythingOfType("string")).Return() + loc.On("GetLocalizedTemplateMessage", + mock.AnythingOfType("string"), mock.Anything).Return( + func(msg string, tpl map[string]interface{}) string { + data, err := json.Marshal(tpl) + if err != nil { + panic(err) + } + return fmt.Sprintf("%s [%s]", msg, string(data)) + }) + return loc +} + +func (t *CounterProcessorTest) new( + nf NotifyFunc, pr ConnectionDataProvider, noLocalizer ...bool) (Processor, testutil.BufferedLogger) { + loc := t.localizer() + if len(noLocalizer) > 0 && noLocalizer[0] { + loc = nil + } + + log := testutil.NewBufferedLogger() + return CounterProcessor{ + Localizer: loc, + Logger: log, + Notifier: nf, + ConnectionDataProvider: pr, + Error: "default error", + FailureThreshold: DefaultFailureThreshold, + MinRequests: DefaultMinRequests, + Debug: true, + }, log +} + +func (t *CounterProcessorTest) notifier(err ...error) *notifierMock { + if len(err) > 0 && err[0] != nil { + return ¬ifierMock{err: err[0]} + } + return ¬ifierMock{} +} + +func (t *CounterProcessorTest) provider(notFound ...bool) ConnectionDataProvider { + if len(notFound) > 0 && notFound[0] { + return func(id int) (apiURL, apiKey, lang string, exists bool) { + return "", "", "", false + } + } + return func(id int) (apiURL, apiKey, lang string, exists bool) { + return t.apiURL, t.apiKey, t.lang, true + } +} + +func (t *CounterProcessorTest) counter() mockedCounter { + return &counterMock{} +} + +func (t *CounterProcessorTest) Test_FailureProcessed() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(true) + c.On("IsFailureProcessed").Return(true) + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Contains(log.String(), "skipping counter id=1 because its failure is already processed") +} + +func (t *CounterProcessorTest) Test_CounterFailed_CannotFindConnection() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider(true)) + c := t.counter() + c.On("IsFailed").Return(true) + c.On("IsFailureProcessed").Return(false) + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Contains(log.String(), "cannot find connection data for counter id=1") +} + +func (t *CounterProcessorTest) Test_CounterFailed_ErrWhileNotifying() { + n := t.notifier(errors.New("http status code: 500")) + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(true) + c.On("IsFailureProcessed").Return(false) + c.On("Message").Return("error message") + c.On("FailureProcessed").Return() + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Contains(log.String(), "cannot send notification for counter id=1: http status code: 500 (message: error message)") + t.Assert().Equal(t.apiURL, n.apiURL) + t.Assert().Equal(t.apiKey, n.apiKey) + t.Assert().Equal("error message", n.message) +} + +func (t *CounterProcessorTest) Test_CounterFailed_SentNotification() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(true) + c.On("IsFailureProcessed").Return(false) + c.On("Message").Return("error message") + c.On("FailureProcessed").Return() + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Empty(log.String()) + t.Assert().Equal(t.apiURL, n.apiURL) + t.Assert().Equal(t.apiKey, n.apiKey) + t.Assert().Equal("error message", n.message) +} + +func (t *CounterProcessorTest) Test_TooFewRequests() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(0)) + c.On("TotalSucceeded").Return(uint32(DefaultMinRequests - 1)) + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Contains(log.String(), + fmt.Sprintf("skipping counter id=%d because it has fewer than %d requests", 1, DefaultMinRequests)) +} + +func (t *CounterProcessorTest) Test_ThresholdNotPassed() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(20)) + c.On("TotalSucceeded").Return(uint32(80)) + c.On("ClearCountersProcessed").Return() + c.On("FlushCounters").Return() + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Empty(log.String()) + t.Assert().Empty(n.message) +} + +func (t *CounterProcessorTest) Test_ThresholdPassed_AlreadyProcessed() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(21)) + c.On("TotalSucceeded").Return(uint32(79)) + c.On("IsCountersProcessed").Return(true) + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Empty(log.String()) + t.Assert().Empty(n.message) +} + +func (t *CounterProcessorTest) Test_ThresholdPassed_NoConnectionFound() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider(true)) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(21)) + c.On("TotalSucceeded").Return(uint32(79)) + c.On("IsCountersProcessed").Return(false) + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Contains(log.String(), "cannot find connection data for counter id=1") + t.Assert().Empty(n.message) +} + +func (t *CounterProcessorTest) Test_ThresholdPassed_NotifyingError() { + n := t.notifier(errors.New("unknown error")) + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(21)) + c.On("TotalSucceeded").Return(uint32(79)) + c.On("IsCountersProcessed").Return(false) + c.On("Name").Return("MockedCounter") + c.On("Message").Return("") + c.On("CountersProcessed").Return() + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Contains(log.String(), "cannot send notification for counter id=1: unknown error (message: )") + t.Assert().Equal(`default error [{"Name":"MockedCounter"}]`, n.message) +} + +func (t *CounterProcessorTest) Test_ThresholdPassed_NotificationSent() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider()) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(21)) + c.On("TotalSucceeded").Return(uint32(79)) + c.On("IsCountersProcessed").Return(false) + c.On("Name").Return("MockedCounter") + c.On("CountersProcessed").Return() + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Empty(log.String()) + t.Assert().Equal(`default error [{"Name":"MockedCounter"}]`, n.message) +} + +func (t *CounterProcessorTest) Test_ThresholdPassed_NotificationSent_NoLocalizer() { + n := t.notifier() + p, log := t.new(n.Notify, t.provider(), true) + c := t.counter() + c.On("IsFailed").Return(false) + c.On("TotalFailed").Return(uint32(21)) + c.On("TotalSucceeded").Return(uint32(79)) + c.On("IsCountersProcessed").Return(false) + c.On("Name").Return("MockedCounter") + c.On("CountersProcessed").Return() + + p.Process(1, c) + c.AssertExpectations(t.T()) + t.Assert().Empty(log.String()) + t.Assert().Equal(`default error`, n.message) +} + +type localizerMock struct { + mock.Mock +} + +func (l *localizerMock) SetLocale(lang string) { + l.Called(lang) +} + +func (l *localizerMock) GetLocalizedTemplateMessage(messageID string, templateData map[string]interface{}) string { + args := l.Called(messageID, templateData) + if fn, ok := args.Get(0).(func(string, map[string]interface{}) string); ok { + return fn(messageID, templateData) + } + return args.String(0) +} + +type mockedCounter interface { + Counter + On(methodName string, arguments ...interface{}) *mock.Call + AssertExpectations(t mock.TestingT) bool +} + +type counterMock struct { + mock.Mock +} + +func (cm *counterMock) Name() string { + args := cm.Called() + return args.String(0) +} + +func (cm *counterMock) SetName(name string) { + cm.Called(name) +} + +func (cm *counterMock) HitSuccess() { + cm.Called() +} + +func (cm *counterMock) HitFailure() { + cm.Called() +} + +func (cm *counterMock) TotalSucceeded() uint32 { + args := cm.Called() + return args.Get(0).(uint32) +} + +func (cm *counterMock) TotalFailed() uint32 { + args := cm.Called() + return args.Get(0).(uint32) +} + +func (cm *counterMock) Message() string { + args := cm.Called() + return args.String(0) +} + +func (cm *counterMock) IsFailed() bool { + args := cm.Called() + return args.Bool(0) +} + +func (cm *counterMock) Failed(message string) { + cm.Called(message) +} + +func (cm *counterMock) IsFailureProcessed() bool { + args := cm.Called() + return args.Bool(0) +} + +func (cm *counterMock) IsCountersProcessed() bool { + args := cm.Called() + return args.Bool(0) +} + +func (cm *counterMock) FailureProcessed() { + cm.Called() +} + +func (cm *counterMock) CountersProcessed() { + cm.Called() +} + +func (cm *counterMock) ClearCountersProcessed() { + cm.Called() +} + +func (cm *counterMock) FlushCounters() { + cm.Called() +} + +type notifierMock struct { + err error + apiURL string + apiKey string + message string +} + +func (n *notifierMock) Notify(apiURL, apiKey, msg string) error { + n.apiURL = apiURL + n.apiKey = apiKey + n.message = msg + return n.err +} diff --git a/core/healthcheck/storage.go b/core/healthcheck/storage.go new file mode 100644 index 0000000..1c7dd47 --- /dev/null +++ b/core/healthcheck/storage.go @@ -0,0 +1,38 @@ +package healthcheck + +import "sync" + +// SyncMapStorage is a default Storage implementation. It uses sync.Map under the hood because +// deletions should be rare for the storage. If your business logic calls Remove often, it would be better +// to use your own implementation with map[int]Counter and sync.RWMutex. +type SyncMapStorage struct { + constructor CounterConstructor + m sync.Map +} + +// NewSyncMapStorage is a SyncMapStorage constructor. +func NewSyncMapStorage(constructor CounterConstructor) Storage { + return &SyncMapStorage{constructor: constructor} +} + +func (s *SyncMapStorage) Get(id int, name string) Counter { + val, found := s.m.Load(id) + if found { + counter := val.(Counter) + counter.SetName(name) + return val.(Counter) + } + c := s.constructor(name) + s.m.Store(id, c) + return c +} + +func (s *SyncMapStorage) Remove(id int) { + s.m.Delete(id) +} + +func (s *SyncMapStorage) Process(proc Processor) { + s.m.Range(func(key, value any) bool { + return proc.Process(key.(int), value.(Counter)) + }) +} diff --git a/core/healthcheck/storage_test.go b/core/healthcheck/storage_test.go new file mode 100644 index 0000000..fff2a0b --- /dev/null +++ b/core/healthcheck/storage_test.go @@ -0,0 +1,69 @@ +package healthcheck + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/suite" +) + +type SyncMapStorageTest struct { + suite.Suite + storage Storage +} + +func TestSyncMapStorage(t *testing.T) { + t.Parallel() + suite.Run(t, new(SyncMapStorageTest)) +} + +func (t *SyncMapStorageTest) SetupSuite() { + t.storage = NewSyncMapStorage(NewAtomicCounter) +} + +func (t *SyncMapStorageTest) Test_Get() { + counter := t.storage.Get(1, "Name") + t.Assert().NotNil(counter) + t.Assert().IsType(&AtomicCounter{}, counter) + t.Assert().Equal("Name", counter.Name()) + + newCounter := t.storage.Get(1, "New Name") + t.Assert().Equal(counter, newCounter) + t.Assert().Equal("New Name", newCounter.Name()) +} + +func (t *SyncMapStorageTest) Test_Process() { + var wg sync.WaitGroup + wg.Add(1) + t.storage.Process(storageCallbackProcessor{callback: func(id int, counter Counter) bool { + t.Assert().Equal(1, id) + t.Assert().Equal("New Name", counter.Name()) + wg.Done() + return false + }}) + + wg.Wait() +} + +func (t *SyncMapStorageTest) Test_Remove() { + defer func() { + if r := recover(); r != nil { + t.Fail("unexpected panic:", r) + } + }() + t.storage.Remove(0) + t.storage.Remove(-1) + t.storage.Remove(1) + t.storage.Process(storageCallbackProcessor{callback: func(id int, counter Counter) bool { + t.Fail("did not expect any items:", id, counter) + return false + }}) +} + +type storageCallbackProcessor struct { + callback func(id int, counter Counter) bool +} + +func (p storageCallbackProcessor) Process(id int, counter Counter) bool { + return p.callback(id, counter) +} diff --git a/core/job_manager.go b/core/job_manager.go index d6fc49a..fac84de 100644 --- a/core/job_manager.go +++ b/core/job_manager.go @@ -12,6 +12,15 @@ import ( // JobFunc is empty func which should be executed in a parallel goroutine. type JobFunc func(logger.Logger) error +// JobAfterCallback will be called after specific job is done. +// This can be used to run something after asynchronous job is done. The function will +// receive an error from the job which can be used to alter the callback behavior. If callback +// returns an error, it will be processed using job ErrorHandler. +// The callback won't be executed in case of panic. Panicked callback will be show in logs +// as if the job itself panicked, ergo, do not write jobs or callbacks that can panic, +// or process the panic by yourself. +type JobAfterCallback func(jobError error, log logger.Logger) error + // JobErrorHandler is a function to handle jobs errors. First argument is a job name. type JobErrorHandler func(string, error, logger.Logger) @@ -32,21 +41,22 @@ type Job struct { // JobManager controls jobs execution flow. Jobs can be added just for later use (e.g. JobManager can be used as // singleton), or jobs can be executed as regular jobs. Example initialization: -// manager := NewJobManager(). -// SetLogger(logger). -// SetLogging(false) -// _ = manager.RegisterJob("updateTokens", &Job{ -// Command: func(log logger.Logger) error { -// // logic goes here... -// logger.Info("All tokens were updated successfully") -// return nil -// }, -// ErrorHandler: DefaultJobErrorHandler(), -// PanicHandler: DefaultJobPanicHandler(), -// Interval: time.Hour * 3, -// Regular: true, -// }) -// manager.Start() +// +// manager := NewJobManager(). +// SetLogger(logger). +// SetLogging(false) +// _ = manager.RegisterJob("updateTokens", &Job{ +// Command: func(log logger.Logger) error { +// // logic goes here... +// logger.Info("All tokens were updated successfully") +// return nil +// }, +// ErrorHandler: DefaultJobErrorHandler(), +// PanicHandler: DefaultJobPanicHandler(), +// Interval: time.Hour * 3, +// Regular: true, +// }) +// manager.Start() type JobManager struct { logger logger.Logger nilLogger logger.Logger @@ -55,17 +65,24 @@ type JobManager struct { } // getWrappedFunc wraps job into function. -func (j *Job) getWrappedFunc(name string, log logger.Logger) func() { - return func() { +func (j *Job) getWrappedFunc(name string, log logger.Logger) func(callback JobAfterCallback) { + return func(callback JobAfterCallback) { defer func() { if r := recover(); r != nil && j.PanicHandler != nil { j.PanicHandler(name, r, log) } }() - if err := j.Command(log); err != nil && j.ErrorHandler != nil { + err := j.Command(log) + if err != nil && j.ErrorHandler != nil { j.ErrorHandler(name, err, log) } + if callback != nil { + err := callback(err, log) + if j.ErrorHandler != nil { + j.ErrorHandler(name, err, log) + } + } } } @@ -77,7 +94,7 @@ func (j *Job) getWrappedTimerFunc(name string, log logger.Logger) func(chan bool case <-stopChannel: return default: - j.getWrappedFunc(name, log)() + j.getWrappedFunc(name, log)(nil) } } } @@ -118,13 +135,13 @@ func (j *Job) stop() { } // runOnce run job once. -func (j *Job) runOnce(name string, log logger.Logger) { - go j.getWrappedFunc(name, log)() +func (j *Job) runOnce(name string, log logger.Logger, callback JobAfterCallback) { + go j.getWrappedFunc(name, log)(callback) } // runOnceSync run job once in current goroutine. func (j *Job) runOnceSync(name string, log logger.Logger) { - j.getWrappedFunc(name, log)() + j.getWrappedFunc(name, log)(nil) } // NewJobManager is a JobManager constructor. @@ -241,15 +258,52 @@ func (j *JobManager) StopJob(name string) error { } // RunJobOnce starts provided job once if it exists. It's also async. -func (j *JobManager) RunJobOnce(name string) error { +func (j *JobManager) RunJobOnce(name string, callback ...JobAfterCallback) error { if job, ok := j.FetchJob(name); ok { - job.runOnce(name, j.Logger()) + var cb JobAfterCallback + if len(callback) > 0 { + cb = callback[0] + } + job.runOnce(name, j.Logger(), cb) return nil } return fmt.Errorf("cannot find job `%s`", name) } +// RunJobsOnceSequentially will execute provided jobs asynchronously. It uses JobAfterCallback under the hood. +// You can prevent subsequent jobs from running using stopOnError flag. +func (j *JobManager) RunJobsOnceSequentially(names []string, stopOnError bool) error { + if len(names) == 0 { + return nil + } + + var chained JobAfterCallback + for i := len(names) - 1; i > 0; i-- { + i := i + if chained == nil { + chained = func(jobError error, log logger.Logger) error { + if jobError != nil && stopOnError { + return jobError + } + return j.RunJobOnce(names[i]) + } + continue + } + + oldCallback := chained + chained = func(jobError error, log logger.Logger) error { + if jobError != nil && stopOnError { + return jobError + } + err := j.RunJobOnce(names[i], oldCallback) + return err + } + } + + return j.RunJobOnce(names[0], chained) +} + // RunJobOnceSync starts provided job once in current goroutine if job exists. Will wait for job to end it's work. func (j *JobManager) RunJobOnceSync(name string) error { if job, ok := j.FetchJob(name); ok { diff --git a/core/job_manager_test.go b/core/job_manager_test.go index b71a86a..6da22de 100644 --- a/core/job_manager_test.go +++ b/core/job_manager_test.go @@ -293,7 +293,7 @@ func (t *JobTest) Test_getWrappedFunc() { t.onceJob() fn := t.job.getWrappedFunc("job", t.testLogger()) require.NotNil(t.T(), fn) - go fn() + go fn(nil) assert.True(t.T(), t.executed()) assert.False(t.T(), t.errored(time.Millisecond)) assert.False(t.T(), t.panicked(time.Millisecond)) @@ -308,7 +308,7 @@ func (t *JobTest) Test_getWrappedFuncError() { t.onceErrorJob() fn := t.job.getWrappedFunc("job", t.testLogger()) require.NotNil(t.T(), fn) - go fn() + go fn(nil) assert.True(t.T(), t.executed()) assert.True(t.T(), t.errored(time.Millisecond)) assert.False(t.T(), t.panicked(time.Millisecond)) @@ -323,7 +323,7 @@ func (t *JobTest) Test_getWrappedFuncPanic() { t.oncePanicJob() fn := t.job.getWrappedFunc("job", t.testLogger()) require.NotNil(t.T(), fn) - go fn() + go fn(nil) assert.True(t.T(), t.executed()) assert.False(t.T(), t.errored(time.Millisecond)) assert.True(t.T(), t.panicked(time.Millisecond)) @@ -347,7 +347,7 @@ func (t *JobTest) Test_runOnce() { }() t.regularJob() - t.job.runOnce("job", t.testLogger()) + t.job.runOnce("job", t.testLogger(), nil) time.Sleep(time.Millisecond * 5) require.True(t.T(), t.executed()) first := 0 diff --git a/core/module_featurees_uploader_test.go b/core/module_features_uploader_test.go similarity index 100% rename from core/module_featurees_uploader_test.go rename to core/module_features_uploader_test.go diff --git a/go.mod b/go.mod index d48d79e..d8afc1b 100644 --- a/go.mod +++ b/go.mod @@ -1,43 +1,79 @@ module github.com/retailcrm/mg-transport-core/v2 -go 1.12 +go 1.18 require ( - cloud.google.com/go v0.60.0 // indirect github.com/DATA-DOG/go-sqlmock v1.3.3 github.com/aws/aws-sdk-go v1.36.30 github.com/aws/aws-sdk-go-v2 v1.17.2 github.com/aws/aws-sdk-go-v2/config v1.18.4 github.com/aws/aws-sdk-go-v2/service/s3 v1.29.5 github.com/blacked/go-zabbix v0.0.0-20170118040903-3c6a95ec4fdc - github.com/denisenkom/go-mssqldb v0.0.0-20190830225923-3302f0226fbd // indirect github.com/getsentry/sentry-go v0.12.0 github.com/gin-contrib/multitemplate v0.0.0-20190914010127-bba2ccfe37ec github.com/gin-gonic/gin v1.7.2 github.com/go-playground/validator/v10 v10.8.0 - github.com/go-sql-driver/mysql v1.5.0 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/gomarkdown/markdown v0.0.0-20221013030248-663e2500819c github.com/gorilla/securecookie v1.1.1 github.com/gorilla/sessions v1.2.0 github.com/jessevdk/go-flags v1.4.0 github.com/jinzhu/gorm v1.9.11 - github.com/json-iterator/go v1.1.11 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/lib/pq v1.9.0 // indirect github.com/nicksnyder/go-i18n/v2 v2.0.2 - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/pkg/errors v0.9.1 - github.com/retailcrm/api-client-go/v2 v2.0.12 + github.com/retailcrm/api-client-go/v2 v2.1.3 github.com/retailcrm/mg-transport-api-client-go v1.1.32 github.com/retailcrm/zabbix-metrics-collector v1.0.0 github.com/stretchr/testify v1.8.1 - github.com/ugorji/go v1.2.6 // indirect + go.uber.org/atomic v1.10.0 golang.org/x/text v0.3.7 - google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/gormigrate.v1 v1.6.0 gopkg.in/h2non/gock.v1 v1.1.2 gopkg.in/yaml.v2 v2.4.0 ) + +require ( + cloud.google.com/go v0.60.0 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.21 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.20 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect + github.com/aws/smithy-go v1.13.5 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/denisenkom/go-mssqldb v0.0.0-20190830225923-3302f0226fbd // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-playground/locales v0.13.0 // indirect + github.com/go-playground/universal-translator v0.17.0 // indirect + github.com/go-sql-driver/mysql v1.5.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-querystring v1.1.0 // indirect + github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/json-iterator/go v1.1.11 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/leodido/go-urn v1.2.1 // indirect + github.com/lib/pq v1.9.0 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + github.com/ugorji/go/codec v1.2.6 // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect + golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect + google.golang.org/protobuf v1.27.1 // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 27f0915..6d8dfcd 100644 --- a/go.sum +++ b/go.sum @@ -361,8 +361,8 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/retailcrm/api-client-go/v2 v2.0.12 h1:0pMDsz4fzKpizUxoF1qtgwN+oJ+YJruncCj6OTOB3e8= -github.com/retailcrm/api-client-go/v2 v2.0.12/go.mod h1:1yTZl9+gd3+/k0kAJe7sYvC+mL4fqMwIwtnSgSWZlkQ= +github.com/retailcrm/api-client-go/v2 v2.1.3 h1:AVcp9oeSOm6+3EWXCgdQs+XE3PTjzCKKB//MUAe0Zb0= +github.com/retailcrm/api-client-go/v2 v2.1.3/go.mod h1:1yTZl9+gd3+/k0kAJe7sYvC+mL4fqMwIwtnSgSWZlkQ= github.com/retailcrm/mg-transport-api-client-go v1.1.32 h1:IBPltSoD5q2PPZJbNC/prK5F9rEVPXVx/ZzDpi7HKhs= github.com/retailcrm/mg-transport-api-client-go v1.1.32/go.mod h1:AWV6BueE28/6SCoyfKURTo4lF0oXYoOKmHTzehd5vAI= github.com/retailcrm/zabbix-metrics-collector v1.0.0 h1:ju3rhpgVoiKII6oXEJEf2eoJy5bNcYAmOPRp1oPWDmA= @@ -385,6 +385,7 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -397,7 +398,6 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go v1.2.6 h1:tGiWC9HENWE2tqYycIqFTNorMmFRVhNwCpDOpWqnk8E= github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= @@ -424,6 +424,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181112202954-3d3f9f413869/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=