wttr.in/internal/processor/peak.go

99 lines
2.1 KiB
Go
Raw Normal View History

2022-12-03 15:08:43 +00:00
package processor
2020-05-30 16:14:17 +00:00
import (
"log"
"net/http"
"sync"
"time"
"github.com/robfig/cron"
)
2022-12-11 13:28:34 +00:00
func (rp *RequestProcessor) startPeakHandling() error {
var err error
2020-05-30 16:14:17 +00:00
c := cron.New()
// cronTime := fmt.Sprintf("%d,%d * * * *", 30-prefetchInterval/60, 60-prefetchInterval/60)
2022-12-11 13:28:34 +00:00
err = c.AddFunc(
2022-11-27 14:51:41 +00:00
"24 * * * *",
func() { rp.prefetchPeakRequests(&rp.peakRequest30) },
)
2022-12-11 13:28:34 +00:00
if err != nil {
return err
}
err = c.AddFunc(
2022-11-27 14:51:41 +00:00
"54 * * * *",
func() { rp.prefetchPeakRequests(&rp.peakRequest60) },
)
2022-12-11 13:28:34 +00:00
if err != nil {
return err
}
2020-05-30 16:14:17 +00:00
c.Start()
2022-12-11 13:28:34 +00:00
return nil
2020-05-30 16:14:17 +00:00
}
// registerPeakRequest registers requests coming in the peak time.
// Such requests can be prefetched afterwards just before the peak time comes.
2022-11-27 14:51:41 +00:00
func (rp *RequestProcessor) savePeakRequest(cacheDigest string, r *http.Request) {
2022-12-11 13:28:34 +00:00
if _, min, _ := time.Now().Clock(); min == 30 {
2022-11-27 14:51:41 +00:00
rp.peakRequest30.Store(cacheDigest, *r)
2020-05-30 16:14:17 +00:00
} else if min == 0 {
2022-11-27 14:51:41 +00:00
rp.peakRequest60.Store(cacheDigest, *r)
2020-05-30 16:14:17 +00:00
}
}
2022-12-11 13:28:34 +00:00
func (rp *RequestProcessor) prefetchRequest(r *http.Request) error {
_, err := rp.ProcessRequest(r)
return err
2020-05-30 16:14:17 +00:00
}
func syncMapLen(sm *sync.Map) int {
count := 0
f := func(key, value interface{}) bool {
// Not really certain about this part, don't know for sure
// if this is a good check for an entry's existence
if key == "" {
return false
}
count++
return true
}
sm.Range(f)
return count
}
2022-11-27 14:51:41 +00:00
func (rp *RequestProcessor) prefetchPeakRequests(peakRequestMap *sync.Map) {
2020-05-30 16:14:17 +00:00
peakRequestLen := syncMapLen(peakRequestMap)
if peakRequestLen == 0 {
return
}
2022-11-20 16:54:42 +00:00
log.Printf("PREFETCH: Prefetching %d requests\n", peakRequestLen)
2022-12-02 19:10:32 +00:00
sleepBetweenRequests := time.Duration(rp.config.Uplink.PrefetchInterval*1000/peakRequestLen) * time.Millisecond
2020-05-30 16:14:17 +00:00
peakRequestMap.Range(func(key interface{}, value interface{}) bool {
req, ok := value.(http.Request)
if !ok {
log.Println("missing value for:", key)
return true
}
2020-06-01 12:18:03 +00:00
go func(r http.Request) {
2022-12-11 13:28:34 +00:00
err := rp.prefetchRequest(&r)
if err != nil {
log.Println("prefetch request:", err)
}
}(req)
2020-05-30 16:14:17 +00:00
peakRequestMap.Delete(key)
time.Sleep(sleepBetweenRequests)
2022-12-11 13:28:34 +00:00
2020-05-30 16:14:17 +00:00
return true
})
}