rate limiter

This commit is contained in:
Pavel 2025-03-18 16:53:45 +03:00
parent 2859073353
commit 2f33b56cd3
4 changed files with 370 additions and 115 deletions

325
client.go
View File

@ -17,10 +17,6 @@ import (
"github.com/google/go-querystring/query"
)
// HTTPStatusUnknown can return for the method `/api/v5/customers/upload`, `/api/v5/customers-corporate/upload`,
// `/api/v5/orders/upload`.
const HTTPStatusUnknown = 460
// New initialize client.
func New(url string, key string) *Client {
return &Client{
@ -36,6 +32,120 @@ func (c *Client) WithLogger(logger BasicLogger) *Client {
return c
}
// EnableRateLimiter activates rate limiting with specified retry attempts.
func (c *Client) EnableRateLimiter(maxAttempts uint) *Client {
c.mutex.Lock()
defer c.mutex.Unlock()
c.limiter = &RateLimiter{
maxAttempts: maxAttempts,
lastRequest: time.Now().Add(-time.Second), // Initialize to allow immediate first request.
}
return c
}
// applyRateLimit applies rate limiting before sending a request.
func (c *Client) applyRateLimit(uri string) {
if c.limiter == nil {
return
}
c.limiter.mutex.Lock()
defer c.limiter.mutex.Unlock()
var delay time.Duration
if strings.HasPrefix(uri, "/telephony") {
delay = telephonyDelay
} else {
delay = regularDelay
}
elapsed := time.Since(c.limiter.lastRequest)
if elapsed < delay {
time.Sleep(delay - elapsed)
}
c.limiter.lastRequest = time.Now()
}
func (c *Client) executeWithRetryBytes(
uri string,
executeFunc func() (interface{}, int, error),
) ([]byte, int, error) {
res, status, err := c.executeWithRetry(uri, executeFunc)
if res == nil {
return nil, status, err
}
return res.([]byte), status, err
}
func (c *Client) executeWithRetryReadCloser(
uri string,
executeFunc func() (interface{}, int, error),
) (io.ReadCloser, int, error) {
res, status, err := c.executeWithRetry(uri, executeFunc)
if res == nil {
return nil, status, err
}
return res.(io.ReadCloser), status, err
}
// executeWithRetry executes a request with retry logic for rate limiting.
func (c *Client) executeWithRetry(
uri string,
executeFunc func() (interface{}, int, error),
) (interface{}, int, error) {
if c.limiter == nil {
return executeFunc()
}
var (
res interface{}
statusCode int
err error
attempt uint = 1
maxAttempts = c.limiter.maxAttempts
infinite = maxAttempts == 0
lastAttempt = false
)
var baseDelay time.Duration
if strings.HasPrefix(uri, "/telephony") {
baseDelay = telephonyDelay
} else {
baseDelay = regularDelay
}
for infinite || attempt <= maxAttempts {
c.applyRateLimit(uri)
res, statusCode, err = executeFunc()
lastAttempt = !infinite && attempt == maxAttempts
// If rate limited on final attempt, set error to ErrRateLimited. Return results otherwise.
if statusCode == http.StatusServiceUnavailable && lastAttempt {
return res, statusCode, ErrRateLimited
}
// If not rate limited or on final attempt, return result.
if statusCode != http.StatusServiceUnavailable || lastAttempt {
return res, statusCode, err
}
// Calculate exponential backoff delay: baseDelay * 2^(attempt-1).
backoffDelay := baseDelay * (1 << (attempt - 1))
if c.Debug {
c.writeLog("API Error: rate limited (503), retrying in %v (attempt %d/%d)",
backoffDelay, attempt, maxAttempts)
}
time.Sleep(backoffDelay)
attempt++
}
return res, statusCode, err
}
// writeLog writes to the log.
func (c *Client) writeLog(format string, v ...interface{}) {
if c.logger != nil {
@ -48,7 +158,6 @@ func (c *Client) writeLog(format string, v ...interface{}) {
// GetRequest implements GET Request.
func (c *Client) GetRequest(urlWithParameters string, versioned ...bool) ([]byte, int, error) {
var res []byte
var prefix = "/api/v5"
if len(versioned) > 0 {
@ -57,41 +166,49 @@ func (c *Client) GetRequest(urlWithParameters string, versioned ...bool) ([]byte
}
}
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), nil)
if err != nil {
return res, 0, err
}
uri := urlWithParameters
req.Header.Set("X-API-KEY", c.Key)
return c.executeWithRetryBytes(uri, func() (interface{}, int, error) {
var res []byte
if c.Debug {
c.writeLog("API Request: %s %s", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), c.Key)
}
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), nil)
if err != nil {
return res, 0, err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return res, 0, err
}
req.Header.Set("X-API-KEY", c.Key)
if resp.StatusCode >= http.StatusInternalServerError {
return res, resp.StatusCode, CreateGenericAPIError(
fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode))
}
if c.Debug {
c.writeLog("API Request: %s %s", fmt.Sprintf("%s%s%s", c.URL, prefix, urlWithParameters), c.Key)
}
res, err = buildRawResponse(resp)
if err != nil {
return res, 0, err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return res, 0, err
}
if resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError {
return res, resp.StatusCode, CreateAPIError(res)
}
if resp.StatusCode >= http.StatusInternalServerError && resp.StatusCode != http.StatusServiceUnavailable {
return res, resp.StatusCode, CreateGenericAPIError(
fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode))
}
if c.Debug {
c.writeLog("API Response: %s", res)
}
res, err = buildRawResponse(resp)
if err != nil {
return res, 0, err
}
return res, resp.StatusCode, nil
if resp.StatusCode >= http.StatusBadRequest &&
resp.StatusCode < http.StatusInternalServerError &&
resp.StatusCode != http.StatusServiceUnavailable {
return res, resp.StatusCode, CreateAPIError(res)
}
if c.Debug {
c.writeLog("API Response: %s", res)
}
return res, resp.StatusCode, nil
})
}
// PostRequest implements POST Request with generic body data.
@ -100,12 +217,7 @@ func (c *Client) PostRequest(
postData interface{},
contType ...string,
) ([]byte, int, error) {
var (
res []byte
contentType string
)
prefix := "/api/v5"
var contentType string
if len(contType) > 0 {
contentType = contType[0]
@ -113,47 +225,55 @@ func (c *Client) PostRequest(
contentType = "application/x-www-form-urlencoded"
}
reader, err := getReaderForPostData(postData)
if err != nil {
return res, 0, err
}
prefix := "/api/v5"
req, err := http.NewRequest("POST", fmt.Sprintf("%s%s%s", c.URL, prefix, uri), reader)
if err != nil {
return res, 0, err
}
return c.executeWithRetryBytes(uri, func() (interface{}, int, error) {
var res []byte
req.Header.Set("Content-Type", contentType)
req.Header.Set("X-API-KEY", c.Key)
reader, err := getReaderForPostData(postData)
if err != nil {
return res, 0, err
}
if c.Debug {
c.writeLog("API Request: %s %s", uri, c.Key)
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s%s%s", c.URL, prefix, uri), reader)
if err != nil {
return res, 0, err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return res, 0, err
}
req.Header.Set("Content-Type", contentType)
req.Header.Set("X-API-KEY", c.Key)
if resp.StatusCode >= http.StatusInternalServerError {
return res, resp.StatusCode, CreateGenericAPIError(
fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode))
}
if c.Debug {
c.writeLog("API Request: %s %s", uri, c.Key)
}
res, err = buildRawResponse(resp)
if err != nil {
return res, 0, err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return res, 0, err
}
if resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError {
return res, resp.StatusCode, CreateAPIError(res)
}
if resp.StatusCode >= http.StatusInternalServerError && resp.StatusCode != http.StatusServiceUnavailable {
return res, resp.StatusCode, CreateGenericAPIError(
fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode))
}
if c.Debug {
c.writeLog("API Response: %s", res)
}
res, err = buildRawResponse(resp)
if err != nil {
return res, 0, err
}
return res, resp.StatusCode, nil
if resp.StatusCode >= http.StatusBadRequest &&
resp.StatusCode < http.StatusInternalServerError &&
resp.StatusCode != http.StatusServiceUnavailable {
return res, resp.StatusCode, CreateAPIError(res)
}
if c.Debug {
c.writeLog("API Response: %s", res)
}
return res, resp.StatusCode, nil
})
}
func getReaderForPostData(postData interface{}) (io.Reader, error) {
@ -6762,53 +6882,52 @@ func (c *Client) EditProductsGroup(by, id, site string, group ProductGroup) (Act
// log.Printf("%s", fileData)
// }
func (c *Client) GetOrderPlate(by, orderID, site string, plateID int) (io.ReadCloser, int, error) {
p := url.Values{
requestURL := fmt.Sprintf("%s/api/v5/orders/%s/plates/%d/print?%s", c.URL, orderID, plateID, url.Values{
"by": {checkBy(by)},
"site": {site},
}
}.Encode())
requestURL := fmt.Sprintf("%s/api/v5/orders/%s/plates/%d/print?%s", c.URL, orderID, plateID, p.Encode())
req, err := http.NewRequest("GET", requestURL, nil)
return c.executeWithRetryReadCloser(requestURL, func() (interface{}, int, error) {
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
return nil, 0, err
}
if err != nil {
return nil, 0, err
}
req.Header.Set("X-API-KEY", c.Key)
req.Header.Set("X-API-KEY", c.Key)
if c.Debug {
c.writeLog("API Request: %s %s", requestURL, c.Key)
}
if c.Debug {
c.writeLog("API Request: %s %s", requestURL, c.Key)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, 0, err
}
if resp.StatusCode >= http.StatusInternalServerError {
return nil, resp.StatusCode, CreateGenericAPIError(
fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode))
}
if resp.StatusCode >= http.StatusBadRequest && resp.StatusCode < http.StatusInternalServerError {
res, err := buildRawResponse(resp)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, 0, err
}
return nil, resp.StatusCode, CreateAPIError(res)
}
if resp.StatusCode >= http.StatusInternalServerError && resp.StatusCode != http.StatusServiceUnavailable {
return nil, resp.StatusCode, CreateGenericAPIError(
fmt.Sprintf("HTTP request error. Status code: %d.", resp.StatusCode))
}
reader := resp.Body
err = reader.Close()
if resp.StatusCode >= http.StatusBadRequest &&
resp.StatusCode < http.StatusInternalServerError &&
resp.StatusCode != http.StatusServiceUnavailable {
res, err := buildRawResponse(resp)
if err != nil {
return nil, 0, err
}
if err != nil {
return nil, 0, err
}
return reader, resp.StatusCode, nil
return nil, resp.StatusCode, CreateAPIError(res)
}
if err != nil {
return nil, 0, err
}
return resp.Body, resp.StatusCode, nil
})
}
// NotificationsSend send a notification

View File

@ -3,6 +3,7 @@ package retailcrm
import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/require"
"io/ioutil"
"log"
"math/rand"
@ -81,21 +82,131 @@ func TestBaseURLTrimmed(t *testing.T) {
assert.Equal(t, c1.URL, c3.URL)
}
func TestGetRequest(t *testing.T) {
c := client()
func TestGetRequestWithRateLimiter(t *testing.T) {
t.Run("Basic 404 response", func(t *testing.T) {
c := client()
defer gock.Off()
defer gock.Off()
gock.New(crmURL).
Get("/api/v5/fake-method").
Reply(404).
BodyString(`{"success": false, "errorMsg" : "Method not found"}`)
gock.New(crmURL).
Get("/api/v5/fake-method").
Reply(404).
BodyString(`{"success": false, "errorMsg" : "Method not found"}`)
_, status, _ := c.GetRequest("/fake-method")
_, status, _ := c.GetRequest("/fake-method")
if status != http.StatusNotFound {
t.Fail()
}
assert.Equal(t, http.StatusNotFound, status)
})
t.Run("Rate limiter respects configured RPS", func(t *testing.T) {
c := client()
c.EnableRateLimiter(3)
defer gock.Off()
numRequests := 5
for i := 0; i < numRequests; i++ {
gock.New(crmURL).
Get("/api/v5/test-method").
Reply(200).
BodyString(`{"success": true}`)
}
start := time.Now()
for i := 0; i < numRequests; i++ {
_, _, err := c.GetRequest("/test-method")
if err != nil {
t.Fatalf("Request %d failed: %v", i, err)
}
}
elapsed := time.Since(start)
minExpectedTime := time.Duration(numRequests-1) * time.Second / 10
assert.Truef(t, elapsed > minExpectedTime,
"Rate limiter not working correctly. Expected minimum time %v, got %v",
minExpectedTime, elapsed)
})
t.Run("Rate limiter respects telephony endpoint RPS", func(t *testing.T) {
c := client()
c.EnableRateLimiter(3)
defer gock.Off()
numRequests := 5
for i := 0; i < numRequests; i++ {
gock.New(crmURL).
Get("/api/v5/telephony/test-call").
Reply(200).
BodyString(`{"success": true}`)
}
start := time.Now()
for i := 0; i < numRequests; i++ {
_, _, err := c.GetRequest("/telephony/test-call")
if err != nil {
t.Fatalf("Request %d failed: %v", i, err)
}
}
elapsed := time.Since(start)
minExpectedTime := time.Duration(numRequests-1) * time.Second / 40
assert.Truef(t, elapsed > minExpectedTime,
"Rate limiter not working correctly for telephony. Expected minimum time %v, got %v",
minExpectedTime, elapsed)
})
t.Run("Rate limiter retries on 503 responses", func(t *testing.T) {
c := client()
c.EnableRateLimiter(3)
c.Debug = true
defer gock.Off()
gock.New(crmURL).
Get("/api/v5/retry-test").
Reply(503).
BodyString(`{"success": false, "errorMsg": "Rate limit exceeded"}`)
gock.New(crmURL).
Get("/api/v5/retry-test").
Reply(503).
BodyString(`{"success": false, "errorMsg": "Rate limit exceeded"}`)
gock.New(crmURL).
Get("/api/v5/retry-test").
Reply(200).
BodyString(`{"success": true}`)
_, status, err := c.GetRequest("/retry-test")
require.NoErrorf(t, err, "Request failed despite retries: %v", err)
assert.Equal(t, http.StatusOK, status)
assert.True(t, gock.IsDone(), "Not all expected requests were made")
})
t.Run("Rate limiter gives up after max attempts", func(t *testing.T) {
c := client()
c.EnableRateLimiter(2)
defer gock.OffAll()
for i := 0; i < 3; i++ {
gock.New(crmURL).
Get("/api/v5/retry-test").
Reply(503).
BodyString(`{"success": false, "errorMsg": "Rate limit exceeded"}`)
}
_, status, err := c.GetRequest("/retry-test")
assert.Equalf(t, http.StatusServiceUnavailable, status,
"Expected status 503 after max retries, got %d", status)
assert.ErrorIs(t, err, ErrRateLimited, "Expected error after max retries, got nil")
})
}
func TestPostRequest(t *testing.T) {

View File

@ -9,6 +9,8 @@ import (
var missingParameterMatcher = regexp.MustCompile(`^Parameter \'([\w\]\[\_\-]+)\' is missing$`)
var (
// ErrRateLimited will be returned if request was rate limited.
ErrRateLimited = NewAPIError("rate limit exceeded")
// ErrMissingCredentials will be returned if no API key was provided to the API.
ErrMissingCredentials = NewAPIError(`apiKey is missing`)
// ErrInvalidCredentials will be returned if provided API key is invalid.

View File

@ -5,14 +5,28 @@ import (
"net/http"
"reflect"
"strings"
"sync"
"time"
)
// ByID is "id" constant to use as `by` property in methods.
const ByID = "id"
// ByExternalId is "externalId" constant to use as `by` property in methods.
// ByExternalID is "externalId" constant to use as `by` property in methods.
const ByExternalID = "externalId"
// RateLimiter configuration constants
const (
regularPathRPS = 10 // API rate limit (requests per second).
telephonyPathRPS = 40 // Telephony API endpoints rate limit (requests per second).
regularDelay = time.Second / regularPathRPS // Delay between regular requests.
telephonyDelay = time.Second / telephonyPathRPS // Delay between telephony requests.
)
// HTTPStatusUnknown can return for the method `/api/v5/customers/upload`, `/api/v5/customers-corporate/upload`,
// `/api/v5/orders/upload`.
const HTTPStatusUnknown = 460
// Client type.
type Client struct {
URL string
@ -20,6 +34,15 @@ type Client struct {
Debug bool
httpClient *http.Client
logger BasicLogger
limiter *RateLimiter
mutex sync.Mutex
}
// RateLimiter manages API request rates to prevent hitting rate limits.
type RateLimiter struct {
maxAttempts uint // Maximum number of retry attempts (0 = infinite).
lastRequest time.Time // Time of the last request.
mutex sync.Mutex
}
// Pagination type.