Enhanced rate limiting (#620)

* Enhanced rate limiting

* Use time.Ticker correctly
This commit is contained in:
Joona Hoikkala 2023-01-04 11:23:32 +02:00 committed by GitHub
parent 1a684a9c88
commit 2ce22175da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 110 deletions

View file

@ -238,7 +238,7 @@ func (j *Job) startExecution() {
}
//Limiter blocks after reaching the buffer, ensuring limited concurrency
limiter := make(chan bool, j.Config.Threads)
threadlimiter := make(chan bool, j.Config.Threads)
for j.Input.Next() && !j.skipQueue {
// Check if we should stop the process
@ -249,23 +249,25 @@ func (j *Job) startExecution() {
break
}
j.pauseWg.Wait()
limiter <- true
// Handle the rate & thread limiting
threadlimiter <- true
// Ratelimiter handles the rate ticker
<-j.Rate.RateLimiter.C
nextInput := j.Input.Value()
nextPosition := j.Input.Position()
wg.Add(1)
j.Counter++
go func() {
defer func() { <-limiter }()
defer func() { <-threadlimiter }()
defer wg.Done()
threadStart := time.Now()
j.runTask(nextInput, nextPosition, false)
j.sleepIfNeeded()
j.Rate.Throttle()
threadEnd := time.Now()
j.Rate.Tick(threadStart, threadEnd)
}()
if !j.RunningJob {
defer j.Output.Warning(j.Error)
return
@ -306,7 +308,6 @@ func (j *Job) runBackgroundTasks(wg *sync.WaitGroup) {
if !j.RunningJob {
return
}
j.Rate.Adjust()
time.Sleep(time.Millisecond * time.Duration(j.Config.ProgressFrequency))
}
}

View file

@ -8,99 +8,75 @@ import (
type RateThrottle struct {
rateCounter *ring.Ring
RateAdjustment float64
RateAdjustmentPos int
Config *Config
RateMutex sync.Mutex
RateLimiter *time.Ticker
lastAdjustment time.Time
}
func NewRateThrottle(conf *Config) *RateThrottle {
return &RateThrottle{
rateCounter: ring.New(conf.Threads),
RateAdjustment: 0,
RateAdjustmentPos: 0,
r := &RateThrottle{
Config: conf,
lastAdjustment: time.Now(),
}
if conf.Rate > 0 {
r.rateCounter = ring.New(int(conf.Rate * 5))
} else {
r.rateCounter = ring.New(conf.Threads * 5)
}
if conf.Rate > 0 {
ratemicros := 1000000 / conf.Rate
r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros))
} else {
//Million rps is probably a decent hardcoded upper speedlimit
r.RateLimiter = time.NewTicker(time.Microsecond * 1)
}
return r
}
// CurrentRate calculates requests/second value from circular list of rate
func (r *RateThrottle) CurrentRate() int64 {
n := r.rateCounter.Len()
var total int64
total = 0
lowest := int64(0)
highest := int64(0)
r.rateCounter.Do(func(r interface{}) {
switch val := r.(type) {
case int64:
total += val
if lowest == 0 || val < lowest {
lowest = val
}
if val > highest {
highest = val
}
default:
// circular list entry was nil, happens when < number_of_threads responses have been recorded.
// circular list entry was nil, happens when < number_of_threads * 5 responses have been recorded.
// the total number of entries is less than length of the list
n -= 1
}
})
if total > 0 {
avg := total / int64(n)
return time.Second.Nanoseconds() * int64(r.Config.Threads) / avg
earliest := time.UnixMicro(lowest)
latest := time.UnixMicro(highest)
elapsed := latest.Sub(earliest)
if n > 0 && elapsed.Milliseconds() > 1 {
return int64(1000 * int64(n) / elapsed.Milliseconds())
}
return 0
}
return 0
func (r *RateThrottle) ChangeRate(rate int) {
ratemicros := 1000000 / rate
r.RateLimiter.Stop()
r.RateLimiter = time.NewTicker(time.Microsecond * time.Duration(ratemicros))
r.Config.Rate = int64(rate)
// reset the rate counter
r.rateCounter = ring.New(rate * 5)
}
// rateTick adds a new duration measurement tick to rate counter
func (r *RateThrottle) Tick(start, end time.Time) {
if start.Before(r.lastAdjustment) {
// We don't want to store data for threads started pre-adjustment
return
}
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
dur := end.Sub(start).Nanoseconds()
r.rateCounter = r.rateCounter.Next()
r.RateAdjustmentPos += 1
r.rateCounter.Value = dur
}
func (r *RateThrottle) Throttle() {
if r.Config.Rate == 0 {
// No throttling
return
}
if r.RateAdjustment > 0.0 {
delayNS := float64(time.Second.Nanoseconds()) * r.RateAdjustment
time.Sleep(time.Nanosecond * time.Duration(delayNS))
}
}
//Adjust changes the RateAdjustment value, which is multiplier of second to pause between requests in a thread
func (r *RateThrottle) Adjust() {
if r.RateAdjustmentPos < r.Config.Threads {
// Do not adjust if we don't have enough data yet
return
}
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
currentRate := r.CurrentRate()
if r.RateAdjustment == 0.0 {
if currentRate > r.Config.Rate {
// If we're adjusting the rate for the first time, start at a safe point (0.2sec)
r.RateAdjustment = 0.2
return
} else {
// NOOP
return
}
}
difference := float64(currentRate) / float64(r.Config.Rate)
if r.RateAdjustment < 0.00001 && difference < 0.9 {
// Reset the rate adjustment as throttling is not relevant at current speed
r.RateAdjustment = 0.0
} else {
r.RateAdjustment = r.RateAdjustment * difference
}
// Reset the counters
r.lastAdjustment = time.Now()
r.RateAdjustmentPos = 0
r.rateCounter.Value = end.UnixMicro()
}

View file

@ -177,6 +177,20 @@ func (i *interactive) handleInput(in []byte) {
case "queueskip":
i.Job.SkipQueue()
i.Job.Output.Info("Skipping to the next queued job")
case "rate":
if len(args) < 2 {
i.Job.Output.Error("Please define the new rate")
} else if len(args) > 2 {
i.Job.Output.Error("Too many arguments for \"rate\"")
} else {
newrate, err := strconv.Atoi(args[1])
if err != nil {
i.Job.Output.Error(fmt.Sprintf("Could not adjust rate: %s", err))
} else {
i.Job.Rate.ChangeRate(newrate)
}
}
default:
if i.paused {
i.Job.Output.Warning(fmt.Sprintf("Unknown command: \"%s\". Enter \"help\" for a list of available commands", args[0]))
@ -278,6 +292,7 @@ func (i *interactive) printHelp() {
ft = "(active: " + filter.Repr() + ")"
}
}
rate := fmt.Sprintf("(active: %d)", i.Job.Config.Rate)
help := `
available commands:
afc [value] - append to status code filter %s
@ -290,6 +305,7 @@ available commands:
fs [value] - (re)configure size filter %s
aft [value] - append to time filter %s
ft [value] - (re)configure time filter %s
rate [value] - adjust rate of requests per second %s
queueshow - show job queue
queuedel [number] - delete a job in the queue
queueskip - advance to the next queued job
@ -299,5 +315,5 @@ available commands:
savejson [filename] - save current matches to a file
help - you are looking at it
`
i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft))
i.Job.Output.Raw(fmt.Sprintf(help, fc, fc, fl, fl, fw, fw, fs, fs, ft, ft, rate))
}