From eca6f180a3f882578e989f864bf4ff6a5c58199f Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sun, 26 Sep 2021 03:41:40 +0000 Subject: [PATCH] Add RateLimit --- client/ratelimit.go | 75 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 client/ratelimit.go diff --git a/client/ratelimit.go b/client/ratelimit.go new file mode 100644 index 0000000..c7adf4c --- /dev/null +++ b/client/ratelimit.go @@ -0,0 +1,75 @@ +package client + +import "sync" +import "time" + +type RateLimit struct { + perSecond float64 + balance float64 + limit float64 + updated time.Time + mu sync.Mutex +} + +func NewRateLimit(perSecond, limit float64) *RateLimit { + return &RateLimit{ + perSecond: perSecond, + balance: limit, + limit: limit, + updated: time.Now(), + } +} + +func NewRateLimitPerMinute(perMinute, limit float64) *RateLimit { + return NewRateLimit(perMinute/60, limit) +} + +// Acquire sufficient rate quota to execute 1 operation +func (rl *RateLimit) Acquire1() { + rl.AcquireN(1.0) +} + +// Acquire sufficient rate quota to execute /cost/ operations +func (rl *RateLimit) AcquireN(cost float64) { + for { + rl.mu.Lock() + + rl.replenish() + + if rl.balance >= cost { + rl.balance -= cost + rl.mu.Unlock() + return + } + + costDelta := cost - rl.balance + sleep := time.Duration(costDelta / rl.perSecond * float64(time.Second)) + rl.mu.Unlock() + time.Sleep(sleep) + } +} + +// Handle a Retry-After server response +func (rl *RateLimit) RetryAfter(seconds int64) { + rl.mu.Lock() + defer rl.mu.Unlock() + + target := 1.0 - (float64(seconds) * rl.perSecond) + if target < rl.balance { + rl.balance = target + } +} + +// Must be called with rl.mu already locked +func (rl *RateLimit) replenish() { + now := time.Now() + timeDelta := now.Sub(rl.updated) + balanceDelta := timeDelta.Seconds() * rl.perSecond + + rl.balance += balanceDelta + if rl.balance > rl.limit { + rl.balance = rl.limit + } + + rl.updated = now +}