Throttle rate of requests per second (#287)

* Add the functionality to perform req/sec limiting (for humans)

* Add documentation
This commit is contained in:
Joona Hoikkala 2020-08-30 13:51:41 +03:00 committed by GitHub
parent e752339fc8
commit 0ce941326b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 153 additions and 25 deletions

View file

@ -1,6 +1,7 @@
## Changelog
- master
- New
- New CLI flag `-rate` to set maximum rate of requests per second. The adjustment is dynamic.
- Changed

View file

@ -132,6 +132,7 @@ GENERAL OPTIONS:
-maxtime Maximum running time in seconds for entire process. (default: 0)
-maxtime-job Maximum running time in seconds per job. (default: 0)
-p Seconds of `delay` between requests, or a range of random delay. For example "0.1" or "0.1-2.0"
-rate Rate of requests per second (default: 0)
-s Do not print additional information (silent mode) (default: false)
-sa Stop on all error cases. Implies -sf and -se. (default: false)
-se Stop on spurious errors (default: false)

View file

@ -61,7 +61,7 @@ func Usage() {
Description: "",
Flags: make([]UsageFlag, 0),
Hidden: false,
ExpectedFlags: []string{"ac", "acc", "c", "maxtime", "maxtime-job", "p", "s", "sa", "se", "sf", "t", "v", "V"},
ExpectedFlags: []string{"ac", "acc", "c", "maxtime", "maxtime-job", "p", "rate", "s", "sa", "se", "sf", "t", "v", "V"},
}
u_compat := UsageSection{
Name: "COMPATIBILITY OPTIONS",

12
main.go
View file

@ -35,6 +35,7 @@ type cliOptions struct {
matcherWords string
matcherLines string
proxyURL string
rate int
replayProxyURL string
request string
requestProto string
@ -98,6 +99,7 @@ func main() {
flag.StringVar(&opts.matcherWords, "mw", "", "Match amount of words in response")
flag.StringVar(&opts.matcherLines, "ml", "", "Match amount of lines in response")
flag.StringVar(&opts.proxyURL, "x", "", "HTTP Proxy URL")
flag.IntVar(&opts.rate, "rate", 0, "Rate of requests per second")
flag.StringVar(&opts.request, "request", "", "File containing the raw http request")
flag.StringVar(&opts.requestProto, "request-proto", "https", "Protocol to use along with raw request")
flag.StringVar(&conf.Method, "X", "GET", "HTTP method to use")
@ -167,9 +169,7 @@ func main() {
}
func prepareJob(conf *ffuf.Config) (*ffuf.Job, error) {
job := &ffuf.Job{
Config: conf,
}
job := ffuf.NewJob(conf)
errs := ffuf.NewMultierror()
var err error
inputprovider, err := input.NewInputProvider(conf)
@ -496,6 +496,12 @@ func prepareConfig(parseOpts *cliOptions, conf *ffuf.Config) error {
}
}
if parseOpts.rate < 0 {
conf.Rate = 0
} else {
conf.Rate = int64(parseOpts.rate)
}
return errs.ErrorOrNil()
}

View file

@ -43,6 +43,7 @@ type Config struct {
MaxTimeJob int `json:"maxtime_job"`
Recursion bool `json:"recursion"`
RecursionDepth int `json:"recursion_depth"`
Rate int64 `json:"rate"`
}
type InputProviderConfig struct {
@ -76,12 +77,13 @@ func NewConfig(ctx context.Context) Config {
conf.Extensions = make([]string, 0)
conf.Timeout = 10
// Progress update frequency, in milliseconds
conf.ProgressFrequency = 100
conf.ProgressFrequency = 125
conf.DirSearchCompat = false
conf.Verbose = false
conf.MaxTime = 0
conf.MaxTimeJob = 0
conf.Recursion = false
conf.RecursionDepth = 0
conf.Rate = 0
return conf
}

View file

@ -28,6 +28,7 @@ type Job struct {
Count403 int
Count429 int
Error string
Rate *RateThrottle
startTime time.Time
startTimeJob time.Time
queuejobs []QueueJob
@ -40,8 +41,9 @@ type QueueJob struct {
depth int
}
func NewJob(conf *Config) Job {
func NewJob(conf *Config) *Job {
var j Job
j.Config = conf
j.Counter = 0
j.ErrorCounter = 0
j.SpuriousErrorCounter = 0
@ -50,7 +52,8 @@ func NewJob(conf *Config) Job {
j.queuepos = 0
j.queuejobs = make([]QueueJob, 0)
j.currentDepth = 0
return j
j.Rate = NewRateThrottle(conf)
return &j
}
//incError increments the error counter
@ -132,10 +135,24 @@ func (j *Job) prepareQueueJob() {
j.queuepos += 1
}
func (j *Job) sleepIfNeeded() {
var sleepDuration time.Duration
if j.Config.Delay.HasDelay {
if j.Config.Delay.IsRange {
sTime := j.Config.Delay.Min + rand.Float64()*(j.Config.Delay.Max-j.Config.Delay.Min)
sleepDuration = time.Duration(sTime * 1000)
} else {
sleepDuration = time.Duration(j.Config.Delay.Min * 1000)
}
sleepDuration = sleepDuration * time.Millisecond
}
time.Sleep(sleepDuration)
}
func (j *Job) startExecution() {
var wg sync.WaitGroup
wg.Add(1)
go j.runProgress(&wg)
go j.runBackgroundTasks(&wg)
//Limiter blocks after reaching the buffer, ensuring limited concurrency
limiter := make(chan bool, j.Config.Threads)
@ -147,26 +164,21 @@ func (j *Job) startExecution() {
defer j.Output.Warning(j.Error)
break
}
limiter <- true
nextInput := j.Input.Value()
nextPosition := j.Input.Position()
wg.Add(1)
j.Counter++
go func() {
defer func() { <-limiter }()
defer wg.Done()
threadStart := time.Now()
j.runTask(nextInput, nextPosition, false)
if j.Config.Delay.HasDelay {
var sleepDurationMS time.Duration
if j.Config.Delay.IsRange {
sTime := j.Config.Delay.Min + rand.Float64()*(j.Config.Delay.Max-j.Config.Delay.Min)
sleepDurationMS = time.Duration(sTime * 1000)
} else {
sleepDurationMS = time.Duration(j.Config.Delay.Min * 1000)
}
time.Sleep(sleepDurationMS * time.Millisecond)
}
j.sleepIfNeeded()
j.Rate.Throttle()
threadEnd := time.Now()
j.Rate.Tick(threadStart, threadEnd)
}()
if !j.RunningJob {
@ -190,7 +202,7 @@ func (j *Job) interruptMonitor() {
}()
}
func (j *Job) runProgress(wg *sync.WaitGroup) {
func (j *Job) runBackgroundTasks(wg *sync.WaitGroup) {
defer wg.Done()
totalProgress := j.Input.Total()
for j.Counter <= totalProgress {
@ -198,16 +210,14 @@ func (j *Job) runProgress(wg *sync.WaitGroup) {
if !j.Running {
break
}
j.updateProgress()
if j.Counter == totalProgress {
return
}
if !j.RunningJob {
return
}
j.Rate.Adjust()
time.Sleep(time.Millisecond * time.Duration(j.Config.ProgressFrequency))
}
}
@ -217,6 +227,7 @@ func (j *Job) updateProgress() {
StartedAt: j.startTimeJob,
ReqCount: j.Counter,
ReqTotal: j.Input.Total(),
ReqSec: j.Rate.CurrentRate(),
QueuePos: j.queuepos,
QueueTotal: len(j.queuejobs),
ErrorCount: j.ErrorCounter,

View file

@ -8,6 +8,7 @@ type Progress struct {
StartedAt time.Time
ReqCount int
ReqTotal int
ReqSec int64
QueuePos int
QueueTotal int
ErrorCount int

106
pkg/ffuf/rate.go Normal file
View file

@ -0,0 +1,106 @@
package ffuf
import (
"container/ring"
"sync"
"time"
)
type RateThrottle struct {
rateCounter *ring.Ring
RateAdjustment float64
RateAdjustmentPos int
Config *Config
RateMutex sync.Mutex
lastAdjustment time.Time
}
func NewRateThrottle(conf *Config) *RateThrottle {
return &RateThrottle{
rateCounter: ring.New(conf.Threads),
RateAdjustment: 0,
RateAdjustmentPos: 0,
Config: conf,
lastAdjustment: time.Now(),
}
}
//CurrentRate calculates requests/second value from circular list of rate
func (r *RateThrottle) CurrentRate() int64 {
n := r.rateCounter.Len()
var total int64
total = 0
r.rateCounter.Do(func(r interface{}) {
switch val := r.(type) {
case int64:
total += val
default:
// circular list entry was nil, happens when < number_of_threads responses have been recorded.
// the total number of entries is less than length of the list
n -= 1
}
})
if total > 0 {
avg := total / int64(n)
return time.Second.Nanoseconds() * int64(r.Config.Threads) / avg
}
return 0
}
//rateTick adds a new duration measurement tick to rate counter
func (r *RateThrottle) Tick(start, end time.Time) {
if start.Before(r.lastAdjustment) {
// We don't want to store data for threads started pre-adjustment
return
}
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
dur := end.Sub(start).Nanoseconds()
r.rateCounter = r.rateCounter.Next()
r.RateAdjustmentPos += 1
r.rateCounter.Value = dur
}
func (r *RateThrottle) Throttle() {
if r.Config.Rate == 0 {
// No throttling
return
}
if r.RateAdjustment > 0.0 {
delayNS := float64(time.Second.Nanoseconds()) * r.RateAdjustment
time.Sleep(time.Nanosecond * time.Duration(delayNS))
}
}
//Adjust changes the RateAdjustment value, which is multiplier of second to pause between requests in a thread
func (r *RateThrottle) Adjust() {
if r.RateAdjustmentPos < r.Config.Threads {
// Do not adjust if we don't have enough data yet
return
}
r.RateMutex.Lock()
defer r.RateMutex.Unlock()
currentRate := r.CurrentRate()
if r.RateAdjustment == 0.0 {
if currentRate > r.Config.Rate {
// If we're adjusting the rate for the first time, start at a safe point (0.2sec)
r.RateAdjustment = 0.2
return
} else {
// NOOP
return
}
}
difference := float64(currentRate) / float64(r.Config.Rate)
if r.RateAdjustment < 0.00001 && difference < 0.9 {
// Reset the rate adjustment as throttling is not relevant at current speed
r.RateAdjustment = 0.0
} else {
r.RateAdjustment = r.RateAdjustment * difference
}
// Reset the counters
r.lastAdjustment = time.Now()
r.RateAdjustmentPos = 0
}

View file

@ -154,9 +154,9 @@ func (s *Stdoutput) Progress(status ffuf.Progress) {
dur := time.Now().Sub(status.StartedAt)
runningSecs := int(dur / time.Second)
var reqRate int
var reqRate int64
if runningSecs > 0 {
reqRate = int(status.ReqCount / runningSecs)
reqRate = status.ReqSec
} else {
reqRate = 0
}