make rate limiter more concurrent-friendly
This commit is contained in:
parent
c80d4bf18f
commit
f3f87d1b7b
2
go.mod
2
go.mod
@ -4,6 +4,7 @@ go 1.22
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/google/go-querystring v1.0.0
|
github.com/google/go-querystring v1.0.0
|
||||||
|
github.com/jonboulle/clockwork v0.4.0
|
||||||
github.com/stretchr/testify v1.8.1
|
github.com/stretchr/testify v1.8.1
|
||||||
gopkg.in/h2non/gock.v1 v1.1.2
|
gopkg.in/h2non/gock.v1 v1.1.2
|
||||||
)
|
)
|
||||||
@ -12,5 +13,6 @@ require (
|
|||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
|
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/stretchr/objx v0.5.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
3
go.sum
3
go.sum
@ -5,12 +5,15 @@ github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASu
|
|||||||
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
||||||
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
|
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
|
||||||
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
|
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
|
||||||
|
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
|
||||||
|
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
|
||||||
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4=
|
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4=
|
||||||
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
|
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||||
|
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
|
||||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
|
@ -8,25 +8,25 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type token struct {
|
type token struct {
|
||||||
rps uint32
|
rps atomic.Uint32
|
||||||
lastUse time.Time
|
lastUse atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
type TokensBucket struct {
|
type TokensBucket struct {
|
||||||
maxRPS uint32
|
maxRPS uint32
|
||||||
mux sync.Mutex
|
tokens sync.Map
|
||||||
tokens map[string]*token
|
|
||||||
unusedTokenTime time.Duration
|
unusedTokenTime time.Duration
|
||||||
checkTokenTime time.Duration
|
checkTokenTime time.Duration
|
||||||
cancel atomic.Bool
|
cancel atomic.Bool
|
||||||
|
sleep sleeper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTokensBucket(maxRPS uint32, unusedTokenTime, checkTokenTime time.Duration) *TokensBucket {
|
func NewTokensBucket(maxRPS uint32, unusedTokenTime, checkTokenTime time.Duration) *TokensBucket {
|
||||||
bucket := &TokensBucket{
|
bucket := &TokensBucket{
|
||||||
maxRPS: maxRPS,
|
maxRPS: maxRPS,
|
||||||
tokens: map[string]*token{},
|
|
||||||
unusedTokenTime: unusedTokenTime,
|
unusedTokenTime: unusedTokenTime,
|
||||||
checkTokenTime: checkTokenTime,
|
checkTokenTime: checkTokenTime,
|
||||||
|
sleep: realSleeper{},
|
||||||
}
|
}
|
||||||
|
|
||||||
go bucket.deleteUnusedToken()
|
go bucket.deleteUnusedToken()
|
||||||
@ -35,27 +35,26 @@ func NewTokensBucket(maxRPS uint32, unusedTokenTime, checkTokenTime time.Duratio
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *TokensBucket) Obtain(id string) {
|
func (m *TokensBucket) Obtain(id string) {
|
||||||
m.mux.Lock()
|
val, ok := m.tokens.Load(id)
|
||||||
defer m.mux.Unlock()
|
if !ok {
|
||||||
|
token := &token{}
|
||||||
if _, ok := m.tokens[id]; !ok {
|
token.lastUse.Store(time.Now())
|
||||||
m.tokens[id] = &token{
|
token.rps.Store(1)
|
||||||
lastUse: time.Now(),
|
m.tokens.Store(id, token)
|
||||||
rps: 1,
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sleepTime := time.Second - time.Since(m.tokens[id].lastUse)
|
token := val.(*token)
|
||||||
if sleepTime < 0 {
|
sleepTime := time.Second - time.Since(token.lastUse.Load().(time.Time))
|
||||||
m.tokens[id].lastUse = time.Now()
|
if sleepTime <= 0 {
|
||||||
m.tokens[id].rps = 0
|
token.lastUse.Store(time.Now())
|
||||||
} else if m.tokens[id].rps >= m.maxRPS {
|
token.rps.Store(0)
|
||||||
time.Sleep(sleepTime)
|
} else if token.rps.Load() >= m.maxRPS {
|
||||||
m.tokens[id].lastUse = time.Now()
|
m.sleep.Sleep(sleepTime)
|
||||||
m.tokens[id].rps = 0
|
token.lastUse.Store(time.Now())
|
||||||
|
token.rps.Store(0)
|
||||||
}
|
}
|
||||||
m.tokens[id].rps++
|
token.rps.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func destructBasket(m *TokensBucket) {
|
func destructBasket(m *TokensBucket) {
|
||||||
@ -67,15 +66,25 @@ func (m *TokensBucket) deleteUnusedToken() {
|
|||||||
if m.cancel.Load() {
|
if m.cancel.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.mux.Lock()
|
|
||||||
|
|
||||||
for id, token := range m.tokens {
|
m.tokens.Range(func(key, value any) bool {
|
||||||
if time.Since(token.lastUse) >= m.unusedTokenTime {
|
id, token := key.(string), value.(*token)
|
||||||
delete(m.tokens, id)
|
if time.Since(token.lastUse.Load().(time.Time)) >= m.unusedTokenTime {
|
||||||
|
m.tokens.Delete(id)
|
||||||
}
|
}
|
||||||
}
|
return false
|
||||||
m.mux.Unlock()
|
})
|
||||||
|
|
||||||
time.Sleep(m.checkTokenTime)
|
m.sleep.Sleep(m.checkTokenTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sleeper interface {
|
||||||
|
Sleep(time.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
type realSleeper struct{}
|
||||||
|
|
||||||
|
func (s realSleeper) Sleep(d time.Duration) {
|
||||||
|
time.Sleep(d)
|
||||||
|
}
|
||||||
|
72
v1/rate_limit_test.go
Normal file
72
v1/rate_limit_test.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
package v1
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/jonboulle/clockwork"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TokensBucketTest struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTokensBucket(t *testing.T) {
|
||||||
|
suite.Run(t, new(TokensBucketTest))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TokensBucketTest) Test_NewTokensBucket() {
|
||||||
|
t.Assert().NotNil(NewTokensBucket(10, time.Hour, time.Hour))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TokensBucketTest) Test_Obtain_NoThrottle() {
|
||||||
|
tb := NewTokensBucket(100, time.Hour, time.Minute)
|
||||||
|
start := time.Now()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
tb.Obtain("a")
|
||||||
|
}
|
||||||
|
t.Assert().True(time.Since(start) < time.Second) // check that rate limiter did not perform throttle.
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TokensBucketTest) Test_Obtain_Sleep() {
|
||||||
|
clock := &fakeSleeper{}
|
||||||
|
tb := NewTokensBucket(100, time.Hour, time.Minute)
|
||||||
|
tb.cancel.Store(true) // prevent unused token removal.
|
||||||
|
tb.sleep = clock
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 301; i++ {
|
||||||
|
tb.Obtain("a")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
t.Assert().Equal(3, int(clock.total.Load()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TokensBucketTest) Test_Obtain_AddRPS() {
|
||||||
|
clock := clockwork.NewFakeClock()
|
||||||
|
tb := NewTokensBucket(100, time.Hour, time.Minute)
|
||||||
|
tb.sleep = clock
|
||||||
|
tb.Obtain("a")
|
||||||
|
clock.Advance(time.Minute * 2)
|
||||||
|
|
||||||
|
item, found := tb.tokens.Load("a")
|
||||||
|
t.Require().True(found)
|
||||||
|
t.Assert().Equal(1, int(item.(*token).rps.Load()))
|
||||||
|
tb.Obtain("a")
|
||||||
|
t.Assert().Equal(2, int(item.(*token).rps.Load()))
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeSleeper struct {
|
||||||
|
total atomic.Uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fakeSleeper) Sleep(time.Duration) {
|
||||||
|
s.total.Add(1)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user