1
0
mirror of synced 2024-11-29 00:25:39 +03:00
mg-transport-api-client-go/v1/rate_limit.go

107 lines
2.3 KiB
Go
Raw Normal View History

2024-02-15 21:59:10 +03:00
package v1
import (
"runtime"
"sync"
"sync/atomic"
"time"
)
2024-04-17 11:24:02 +03:00
// NoopLimiter implements Limiter but doesn't limit anything.
var NoopLimiter Limiter = &noopLimiter{}
2024-02-15 21:59:10 +03:00
type token struct {
rps atomic.Uint32
lastUse atomic.Value
2024-02-15 21:59:10 +03:00
}
2024-04-17 11:24:02 +03:00
// Limiter implements some form of rate limiting.
type Limiter interface {
// Obtain the right to send a request. Should lock the execution if current goroutine needs to wait.
Obtain(string)
}
// TokensBucket implements basic Limiter with fixed window and fixed amount of tokens per window.
2024-02-15 21:59:10 +03:00
type TokensBucket struct {
maxRPS uint32
tokens sync.Map
2024-02-15 21:59:10 +03:00
unusedTokenTime time.Duration
checkTokenTime time.Duration
cancel atomic.Bool
sleep sleeper
2024-02-15 21:59:10 +03:00
}
2024-04-17 11:24:02 +03:00
// NewTokensBucket constructs TokensBucket with provided parameters.
func NewTokensBucket(maxRPS uint32, unusedTokenTime, checkTokenTime time.Duration) Limiter {
2024-02-15 21:59:10 +03:00
bucket := &TokensBucket{
maxRPS: maxRPS,
unusedTokenTime: unusedTokenTime,
checkTokenTime: checkTokenTime,
sleep: realSleeper{},
2024-02-15 21:59:10 +03:00
}
go bucket.deleteUnusedToken()
runtime.SetFinalizer(bucket, destructBasket)
return bucket
}
func (m *TokensBucket) Obtain(id string) {
val, ok := m.tokens.Load(id)
if !ok {
token := &token{}
token.lastUse.Store(time.Now())
token.rps.Store(1)
m.tokens.Store(id, token)
2024-02-15 21:59:10 +03:00
return
}
token := val.(*token)
sleepTime := time.Second - time.Since(token.lastUse.Load().(time.Time))
if sleepTime <= 0 {
token.lastUse.Store(time.Now())
token.rps.Store(0)
} else if token.rps.Load() >= m.maxRPS {
m.sleep.Sleep(sleepTime)
token.lastUse.Store(time.Now())
token.rps.Store(0)
2024-02-15 21:59:10 +03:00
}
token.rps.Add(1)
2024-02-15 21:59:10 +03:00
}
func destructBasket(m *TokensBucket) {
m.cancel.Store(true)
}
func (m *TokensBucket) deleteUnusedToken() {
for {
if m.cancel.Load() {
return
}
m.tokens.Range(func(key, value any) bool {
id, token := key.(string), value.(*token)
if time.Since(token.lastUse.Load().(time.Time)) >= m.unusedTokenTime {
m.tokens.Delete(id)
2024-02-15 21:59:10 +03:00
}
return false
})
2024-02-15 21:59:10 +03:00
m.sleep.Sleep(m.checkTokenTime)
2024-02-15 21:59:10 +03:00
}
}
2024-04-17 11:24:02 +03:00
type noopLimiter struct{}
func (l *noopLimiter) Obtain(string) {}
// sleeper sleeps. This thing is necessary for tests.
type sleeper interface {
Sleep(time.Duration)
}
type realSleeper struct{}
func (s realSleeper) Sleep(d time.Duration) {
time.Sleep(d)
}