From 2f33b56cd3dc93706d61fdca49b1a11f55b2245e Mon Sep 17 00:00:00 2001 From: Neur0toxine Date: Tue, 18 Mar 2025 16:53:45 +0300 Subject: [PATCH] rate limiter --- client.go | 325 +++++++++++++++++++++++++++++++++---------------- client_test.go | 133 ++++++++++++++++++-- error.go | 2 + types.go | 25 +++- 4 files changed, 370 insertions(+), 115 deletions(-) diff --git a/client.go b/client.go index 94a13eb..f94e587 100644 --- a/client.go +++ b/client.go @@ -17,10 +17,6 @@ import ( "github.com/google/go-querystring/query" ) -// HTTPStatusUnknown can return for the method `/api/v5/customers/upload`, `/api/v5/customers-corporate/upload`, -// `/api/v5/orders/upload`. -const HTTPStatusUnknown = 460 - // New initialize client. func New(url string, key string) *Client { return &Client{ @@ -36,6 +32,120 @@ func (c *Client) WithLogger(logger BasicLogger) *Client { return c } +// EnableRateLimiter activates rate limiting with specified retry attempts. +func (c *Client) EnableRateLimiter(maxAttempts uint) *Client { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.limiter = &RateLimiter{ + maxAttempts: maxAttempts, + lastRequest: time.Now().Add(-time.Second), // Initialize to allow immediate first request. + } + + return c +} + +// applyRateLimit applies rate limiting before sending a request. +func (c *Client) applyRateLimit(uri string) { + if c.limiter == nil { + return + } + + c.limiter.mutex.Lock() + defer c.limiter.mutex.Unlock() + + var delay time.Duration + if strings.HasPrefix(uri, "/telephony") { + delay = telephonyDelay + } else { + delay = regularDelay + } + + elapsed := time.Since(c.limiter.lastRequest) + if elapsed < delay { + time.Sleep(delay - elapsed) + } + + c.limiter.lastRequest = time.Now() +} + +func (c *Client) executeWithRetryBytes( + uri string, + executeFunc func() (interface{}, int, error), +) ([]byte, int, error) { + res, status, err := c.executeWithRetry(uri, executeFunc) + if res == nil { + return nil, status, err + } + return res.([]byte), status, err +} + +func (c *Client) executeWithRetryReadCloser( + uri string, + executeFunc func() (interface{}, int, error), +) (io.ReadCloser, int, error) { + res, status, err := c.executeWithRetry(uri, executeFunc) + if res == nil { + return nil, status, err + } + return res.(io.ReadCloser), status, err +} + +// executeWithRetry executes a request with retry logic for rate limiting. +func (c *Client) executeWithRetry( + uri string, + executeFunc func() (interface{}, int, error), +) (interface{}, int, error) { + if c.limiter == nil { + return executeFunc() + } + + var ( + res interface{} + statusCode int + err error + attempt uint = 1 + maxAttempts = c.limiter.maxAttempts + infinite = maxAttempts == 0 + lastAttempt = false + ) + + var baseDelay time.Duration + if strings.HasPrefix(uri, "/telephony") { + baseDelay = telephonyDelay + } else { + baseDelay = regularDelay + } + + for infinite || attempt <= maxAttempts { + c.applyRateLimit(uri) + res, statusCode, err = executeFunc() + lastAttempt = !infinite && attempt == maxAttempts + + // If rate limited on final attempt, set error to ErrRateLimited. Return results otherwise. + if statusCode == http.StatusServiceUnavailable && lastAttempt { + return res, statusCode, ErrRateLimited + } + + // If not rate limited or on final attempt, return result. + if statusCode != http.StatusServiceUnavailable || lastAttempt { + return res, statusCode, err + } + + // Calculate exponential backoff delay: baseDelay * 2^(attempt-1). + backoffDelay := baseDelay * (1 << (attempt - 1)) + if c.Debug { + c.writeLog("API Error: rate limited (503), retrying in %v (attempt %d/%d)", + backoffDelay, attempt, maxAttempts) + } + + time.Sleep(backoffDelay) + attempt++ + } + + return res, statusCode, err +} + // writeLog writes to the log. func (c *Client) writeLog(format string, v ...interface{}) { if c.logger != nil { @@ -48,7 +158,6 @@ func (c *Client) writeLog(format string, v ...interface{}) { // GetRequest implements GET Request. func (c *Client) GetRequest(urlWithParameters string, versioned ...bool) ([]byte, int, error) { - var res []byte var prefix = "/api/v5" if len(versioned) > 0 { @@ -57,41 +166,49 @@ func (c *Client) GetRequest(urlWithParameters string, versioned ...bool) ([]byte } } - req, err := http.NewRequest("GET", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), nil) - if err != nil { - return res, 0, err - } + uri := urlWithParameters - req.Header.Set("X-API-KEY", c.Key) + return c.executeWithRetryBytes(uri, func() (interface{}, int, error) { + var res []byte - if c.Debug { - c.writeLog("API Request: %s %s", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), c.Key) - } + req, err := http.NewRequest("GET", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), nil) + if err != nil { + return res, 0, err + } - resp, err := c.httpClient.Do(req) - if err != nil { - return res, 0, err - } + req.Header.Set("X-API-KEY", c.Key) - if resp.StatusCode >= http.StatusInternalServerError { - return res, resp.StatusCode, CreateGenericAPIError( - fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode)) - } + if c.Debug { + c.writeLog("API Request: %s %s", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), c.Key) + } - res, err = buildRawResponse(resp) - if err != nil { - return res, 0, err - } + resp, err := c.httpClient.Do(req) + if err != nil { + return res, 0, err + } - if resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError { - return res, resp.StatusCode, CreateAPIError(res) - } + if resp.StatusCode >= http.StatusInternalServerError && resp.StatusCode != http.StatusServiceUnavailable { + return res, resp.StatusCode, CreateGenericAPIError( + fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode)) + } - if c.Debug { - c.writeLog("API Response: %s", res) - } + res, err = buildRawResponse(resp) + if err != nil { + return res, 0, err + } - return res, resp.StatusCode, nil + if resp.StatusCode >= http.StatusBadRequest && + resp.StatusCode < http.StatusInternalServerError && + resp.StatusCode != http.StatusServiceUnavailable { + return res, resp.StatusCode, CreateAPIError(res) + } + + if c.Debug { + c.writeLog("API Response: %s", res) + } + + return res, resp.StatusCode, nil + }) } // PostRequest implements POST Request with generic body data. @@ -100,12 +217,7 @@ func (c *Client) PostRequest( postData interface{}, contType ...string, ) ([]byte, int, error) { - var ( - res []byte - contentType string - ) - - prefix := "/api/v5" + var contentType string if len(contType) > 0 { contentType = contType[0] @@ -113,47 +225,55 @@ func (c *Client) PostRequest( contentType = "application/x-www-form-urlencoded" } - reader, err := getReaderForPostData(postData) - if err != nil { - return res, 0, err - } + prefix := "/api/v5" - req, err := http.NewRequest("POST", fmt.Sprintf("%s%s%s", c.URL, prefix, uri), reader) - if err != nil { - return res, 0, err - } + return c.executeWithRetryBytes(uri, func() (interface{}, int, error) { + var res []byte - req.Header.Set("Content-Type", contentType) - req.Header.Set("X-API-KEY", c.Key) + reader, err := getReaderForPostData(postData) + if err != nil { + return res, 0, err + } - if c.Debug { - c.writeLog("API Request: %s %s", uri, c.Key) - } + req, err := http.NewRequest("POST", fmt.Sprintf("%s%s%s", c.URL, prefix, uri), reader) + if err != nil { + return res, 0, err + } - resp, err := c.httpClient.Do(req) - if err != nil { - return res, 0, err - } + req.Header.Set("Content-Type", contentType) + req.Header.Set("X-API-KEY", c.Key) - if resp.StatusCode >= http.StatusInternalServerError { - return res, resp.StatusCode, CreateGenericAPIError( - fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode)) - } + if c.Debug { + c.writeLog("API Request: %s %s", uri, c.Key) + } - res, err = buildRawResponse(resp) - if err != nil { - return res, 0, err - } + resp, err := c.httpClient.Do(req) + if err != nil { + return res, 0, err + } - if resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError { - return res, resp.StatusCode, CreateAPIError(res) - } + if resp.StatusCode >= http.StatusInternalServerError && resp.StatusCode != http.StatusServiceUnavailable { + return res, resp.StatusCode, CreateGenericAPIError( + fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode)) + } - if c.Debug { - c.writeLog("API Response: %s", res) - } + res, err = buildRawResponse(resp) + if err != nil { + return res, 0, err + } - return res, resp.StatusCode, nil + if resp.StatusCode >= http.StatusBadRequest && + resp.StatusCode < http.StatusInternalServerError && + resp.StatusCode != http.StatusServiceUnavailable { + return res, resp.StatusCode, CreateAPIError(res) + } + + if c.Debug { + c.writeLog("API Response: %s", res) + } + + return res, resp.StatusCode, nil + }) } func getReaderForPostData(postData interface{}) (io.Reader, error) { @@ -6762,53 +6882,52 @@ func (c *Client) EditProductsGroup(by, id, site string, group ProductGroup) (Act // log.Printf("%s", fileData) // } func (c *Client) GetOrderPlate(by, orderID, site string, plateID int) (io.ReadCloser, int, error) { - p := url.Values{ + requestURL := fmt.Sprintf("%s/api/v5/orders/%s/plates/%d/print?%s", c.URL, orderID, plateID, url.Values{ "by": {checkBy(by)}, "site": {site}, - } + }.Encode()) - requestURL := fmt.Sprintf("%s/api/v5/orders/%s/plates/%d/print?%s", c.URL, orderID, plateID, p.Encode()) - req, err := http.NewRequest("GET", requestURL, nil) + return c.executeWithRetryReadCloser(requestURL, func() (interface{}, int, error) { + req, err := http.NewRequest("GET", requestURL, nil) + if err != nil { + return nil, 0, err + } - if err != nil { - return nil, 0, err - } + req.Header.Set("X-API-KEY", c.Key) - req.Header.Set("X-API-KEY", c.Key) + if c.Debug { + c.writeLog("API Request: %s %s", requestURL, c.Key) + } - if c.Debug { - c.writeLog("API Request: %s %s", requestURL, c.Key) - } - - resp, err := c.httpClient.Do(req) - - if err != nil { - return nil, 0, err - } - - if resp.StatusCode >= http.StatusInternalServerError { - return nil, resp.StatusCode, CreateGenericAPIError( - fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode)) - } - - if resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError { - res, err := buildRawResponse(resp) + resp, err := c.httpClient.Do(req) if err != nil { return nil, 0, err } - return nil, resp.StatusCode, CreateAPIError(res) - } + if resp.StatusCode >= http.StatusInternalServerError && resp.StatusCode != http.StatusServiceUnavailable { + return nil, resp.StatusCode, CreateGenericAPIError( + fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode)) + } - reader := resp.Body - err = reader.Close() + if resp.StatusCode >= http.StatusBadRequest && + resp.StatusCode < http.StatusInternalServerError && + resp.StatusCode != http.StatusServiceUnavailable { + res, err := buildRawResponse(resp) - if err != nil { - return nil, 0, err - } + if err != nil { + return nil, 0, err + } - return reader, resp.StatusCode, nil + return nil, resp.StatusCode, CreateAPIError(res) + } + + if err != nil { + return nil, 0, err + } + + return resp.Body, resp.StatusCode, nil + }) } // NotificationsSend send a notification diff --git a/client_test.go b/client_test.go index 398be36..5d0792c 100644 --- a/client_test.go +++ b/client_test.go @@ -3,6 +3,7 @@ package retailcrm import ( "encoding/json" "fmt" + "github.com/stretchr/testify/require" "io/ioutil" "log" "math/rand" @@ -81,21 +82,131 @@ func TestBaseURLTrimmed(t *testing.T) { assert.Equal(t, c1.URL, c3.URL) } -func TestGetRequest(t *testing.T) { - c := client() +func TestGetRequestWithRateLimiter(t *testing.T) { + t.Run("Basic 404 response", func(t *testing.T) { + c := client() - defer gock.Off() + defer gock.Off() - gock.New(crmURL). - Get("/api/v5/fake-method"). - Reply(404). - BodyString(`{"success": false, "errorMsg" : "Method not found"}`) + gock.New(crmURL). + Get("/api/v5/fake-method"). + Reply(404). + BodyString(`{"success": false, "errorMsg" : "Method not found"}`) - _, status, _ := c.GetRequest("/fake-method") + _, status, _ := c.GetRequest("/fake-method") - if status != http.StatusNotFound { - t.Fail() - } + assert.Equal(t, http.StatusNotFound, status) + }) + + t.Run("Rate limiter respects configured RPS", func(t *testing.T) { + c := client() + c.EnableRateLimiter(3) + + defer gock.Off() + + numRequests := 5 + + for i := 0; i < numRequests; i++ { + gock.New(crmURL). + Get("/api/v5/test-method"). + Reply(200). + BodyString(`{"success": true}`) + } + + start := time.Now() + for i := 0; i < numRequests; i++ { + _, _, err := c.GetRequest("/test-method") + if err != nil { + t.Fatalf("Request %d failed: %v", i, err) + } + } + + elapsed := time.Since(start) + minExpectedTime := time.Duration(numRequests-1) * time.Second / 10 + assert.Truef(t, elapsed > minExpectedTime, + "Rate limiter not working correctly. Expected minimum time %v, got %v", + minExpectedTime, elapsed) + }) + + t.Run("Rate limiter respects telephony endpoint RPS", func(t *testing.T) { + c := client() + c.EnableRateLimiter(3) + + defer gock.Off() + + numRequests := 5 + + for i := 0; i < numRequests; i++ { + gock.New(crmURL). + Get("/api/v5/telephony/test-call"). + Reply(200). + BodyString(`{"success": true}`) + } + + start := time.Now() + + for i := 0; i < numRequests; i++ { + _, _, err := c.GetRequest("/telephony/test-call") + if err != nil { + t.Fatalf("Request %d failed: %v", i, err) + } + } + + elapsed := time.Since(start) + minExpectedTime := time.Duration(numRequests-1) * time.Second / 40 + assert.Truef(t, elapsed > minExpectedTime, + "Rate limiter not working correctly for telephony. Expected minimum time %v, got %v", + minExpectedTime, elapsed) + }) + + t.Run("Rate limiter retries on 503 responses", func(t *testing.T) { + c := client() + c.EnableRateLimiter(3) + c.Debug = true + + defer gock.Off() + + gock.New(crmURL). + Get("/api/v5/retry-test"). + Reply(503). + BodyString(`{"success": false, "errorMsg": "Rate limit exceeded"}`) + + gock.New(crmURL). + Get("/api/v5/retry-test"). + Reply(503). + BodyString(`{"success": false, "errorMsg": "Rate limit exceeded"}`) + + gock.New(crmURL). + Get("/api/v5/retry-test"). + Reply(200). + BodyString(`{"success": true}`) + + _, status, err := c.GetRequest("/retry-test") + + require.NoErrorf(t, err, "Request failed despite retries: %v", err) + assert.Equal(t, http.StatusOK, status) + assert.True(t, gock.IsDone(), "Not all expected requests were made") + }) + + t.Run("Rate limiter gives up after max attempts", func(t *testing.T) { + c := client() + c.EnableRateLimiter(2) + + defer gock.OffAll() + + for i := 0; i < 3; i++ { + gock.New(crmURL). + Get("/api/v5/retry-test"). + Reply(503). + BodyString(`{"success": false, "errorMsg": "Rate limit exceeded"}`) + } + + _, status, err := c.GetRequest("/retry-test") + + assert.Equalf(t, http.StatusServiceUnavailable, status, + "Expected status 503 after max retries, got %d", status) + assert.ErrorIs(t, err, ErrRateLimited, "Expected error after max retries, got nil") + }) } func TestPostRequest(t *testing.T) { diff --git a/error.go b/error.go index 01e946e..33ede3d 100644 --- a/error.go +++ b/error.go @@ -9,6 +9,8 @@ import ( var missingParameterMatcher = regexp.MustCompile(`^Parameter \'([\w\]\[\_\-]+)\' is missing$`) var ( + // ErrRateLimited will be returned if request was rate limited. + ErrRateLimited = NewAPIError("rate limit exceeded") // ErrMissingCredentials will be returned if no API key was provided to the API. ErrMissingCredentials = NewAPIError(`apiKey is missing`) // ErrInvalidCredentials will be returned if provided API key is invalid. diff --git a/types.go b/types.go index 77e3424..94e4d3c 100644 --- a/types.go +++ b/types.go @@ -5,14 +5,28 @@ import ( "net/http" "reflect" "strings" + "sync" + "time" ) // ByID is "id" constant to use as `by` property in methods. const ByID = "id" -// ByExternalId is "externalId" constant to use as `by` property in methods. +// ByExternalID is "externalId" constant to use as `by` property in methods. const ByExternalID = "externalId" +// RateLimiter configuration constants +const ( + regularPathRPS = 10 // API rate limit (requests per second). + telephonyPathRPS = 40 // Telephony API endpoints rate limit (requests per second). + regularDelay = time.Second / regularPathRPS // Delay between regular requests. + telephonyDelay = time.Second / telephonyPathRPS // Delay between telephony requests. +) + +// HTTPStatusUnknown can return for the method `/api/v5/customers/upload`, `/api/v5/customers-corporate/upload`, +// `/api/v5/orders/upload`. +const HTTPStatusUnknown = 460 + // Client type. type Client struct { URL string @@ -20,6 +34,15 @@ type Client struct { Debug bool httpClient *http.Client logger BasicLogger + limiter *RateLimiter + mutex sync.Mutex +} + +// RateLimiter manages API request rates to prevent hitting rate limits. +type RateLimiter struct { + maxAttempts uint // Maximum number of retry attempts (0 = infinite). + lastRequest time.Time // Time of the last request. + mutex sync.Mutex } // Pagination type.