wttr.in/internal/processor/processor.go

382 lines
9 KiB
Go
Raw Normal View History

2022-12-03 15:08:43 +00:00
package processor
2020-05-30 16:14:17 +00:00
import (
"context"
2020-05-30 16:14:17 +00:00
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
2020-06-08 05:26:38 +00:00
"strings"
2022-11-27 14:51:41 +00:00
"sync"
2020-05-30 16:14:17 +00:00
"time"
2022-11-27 14:51:41 +00:00
2022-12-03 15:08:43 +00:00
lru "github.com/hashicorp/golang-lru"
2022-12-02 19:10:32 +00:00
"github.com/chubin/wttr.in/internal/config"
2022-12-04 15:55:14 +00:00
geoip "github.com/chubin/wttr.in/internal/geo/ip"
geoloc "github.com/chubin/wttr.in/internal/geo/location"
2022-11-29 20:24:53 +00:00
"github.com/chubin/wttr.in/internal/routing"
2022-12-03 14:39:34 +00:00
"github.com/chubin/wttr.in/internal/stats"
2022-12-03 15:08:43 +00:00
"github.com/chubin/wttr.in/internal/util"
2020-05-30 16:14:17 +00:00
)
2022-12-03 15:08:43 +00:00
// plainTextAgents contains signatures of the plain-text agents.
2022-12-11 13:28:34 +00:00
func plainTextAgents() []string {
return []string{
"curl",
"httpie",
"lwp-request",
"wget",
"python-httpx",
"python-requests",
"openbsd ftp",
"powershell",
"fetch",
"aiohttp",
"http_get",
"xh",
2023-09-16 20:22:41 +00:00
"nushell",
2022-12-11 13:28:34 +00:00
}
2022-12-03 15:08:43 +00:00
}
type ResponseWithHeader struct {
2022-11-27 14:51:41 +00:00
InProgress bool // true if the request is being processed
Expires time.Time // expiration time of the cache entry
Body []byte
Header http.Header
StatusCode int // e.g. 200
}
// RequestProcessor handles incoming requests.
type RequestProcessor struct {
peakRequest30 sync.Map
peakRequest60 sync.Map
lruCache *lru.Cache
2022-12-03 14:39:34 +00:00
stats *stats.Stats
router routing.Router
upstreamTransport *http.Transport
2022-12-02 19:10:32 +00:00
config *config.Config
2022-12-04 15:55:14 +00:00
geoIPCache *geoip.Cache
geoLocation *geoloc.Cache
2022-11-27 14:51:41 +00:00
}
// NewRequestProcessor returns new RequestProcessor.
2022-12-02 19:10:32 +00:00
func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
lruCache, err := lru.New(config.Cache.Size)
2022-11-27 14:51:41 +00:00
if err != nil {
return nil, err
}
dialer := &net.Dialer{
2022-12-02 19:10:32 +00:00
Timeout: time.Duration(config.Uplink.Timeout) * time.Second,
KeepAlive: time.Duration(config.Uplink.Timeout) * time.Second,
DualStack: true,
}
transport := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
2022-12-02 19:10:32 +00:00
return dialer.DialContext(ctx, network, config.Uplink.Address)
},
}
2022-12-06 17:52:26 +00:00
geoCache, err := geoip.NewCache(config)
if err != nil {
return nil, err
}
geoLocation, err := geoloc.NewCache(config)
if err != nil {
return nil, err
}
2022-11-27 21:16:32 +00:00
rp := &RequestProcessor{
lruCache: lruCache,
2022-12-03 14:39:34 +00:00
stats: stats.New(),
upstreamTransport: transport,
2022-12-02 19:10:32 +00:00
config: config,
2022-12-06 17:52:26 +00:00
geoIPCache: geoCache,
geoLocation: geoLocation,
2022-11-27 21:16:32 +00:00
}
// Initialize routes.
rp.router.AddPath("/:stats", rp.stats)
rp.router.AddPath("/:geo-ip-get", rp.geoIPCache)
rp.router.AddPath("/:geo-ip-put", rp.geoIPCache)
rp.router.AddPath("/:geo-location", rp.geoLocation)
2022-11-27 21:16:32 +00:00
return rp, nil
2022-11-27 14:51:41 +00:00
}
// Start starts async request processor jobs, such as peak handling.
2022-12-11 13:28:34 +00:00
func (rp *RequestProcessor) Start() error {
return rp.startPeakHandling()
2022-11-27 14:51:41 +00:00
}
func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader, error) {
var (
response *ResponseWithHeader
ip = util.ReadUserIP(r)
)
2022-12-06 17:03:24 +00:00
if ip != "127.0.0.1" {
rp.stats.Inc("total")
}
2022-11-27 21:16:32 +00:00
// Main routing logic.
if rh := rp.router.Route(r); rh != nil {
result := rh.Response(r)
if result != nil {
2022-11-29 20:24:53 +00:00
return fromCadre(result), nil
2022-11-27 21:16:32 +00:00
}
}
if resp, ok := redirectInsecure(r); ok {
2022-11-27 21:16:32 +00:00
rp.stats.Inc("redirects")
2022-12-11 13:28:34 +00:00
return resp, nil
}
2020-06-08 05:26:38 +00:00
if dontCache(r) {
2022-11-27 21:16:32 +00:00
rp.stats.Inc("uncached")
2022-12-11 13:28:34 +00:00
return get(r, rp.upstreamTransport)
2020-06-08 05:26:38 +00:00
}
// processing cached request
2020-05-30 16:14:17 +00:00
cacheDigest := getCacheDigest(r)
2022-11-27 14:51:41 +00:00
rp.savePeakRequest(cacheDigest, r)
2020-05-30 16:14:17 +00:00
response = rp.processRequestFromCache(r)
if response != nil {
return response, nil
}
2020-05-30 16:14:17 +00:00
return rp.processUncachedRequest(r)
}
// processRequestFromCache processes requests using the cache.
// If no entry in cache found, nil is returned.
func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWithHeader {
var (
cacheEntry ResponseWithHeader
cacheDigest = getCacheDigest(r)
ok bool
)
cacheBody, _ := rp.lruCache.Get(cacheDigest)
cacheEntry, ok = cacheBody.(ResponseWithHeader)
if !ok {
return nil
}
// if after all attempts we still have no answer,
// we try to make the query on our own
for attempts := 0; attempts < 300; attempts++ {
if !ok || !cacheEntry.InProgress {
break
2020-05-30 16:14:17 +00:00
}
time.Sleep(30 * time.Millisecond)
cacheBody, _ = rp.lruCache.Get(cacheDigest)
v, ok := cacheBody.(ResponseWithHeader)
if ok {
cacheEntry = v
2020-05-30 16:14:17 +00:00
}
}
if cacheEntry.InProgress {
log.Printf("TIMEOUT: %s\n", cacheDigest)
}
if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) {
rp.stats.Inc("cache1")
2020-05-30 16:14:17 +00:00
return &cacheEntry
}
2022-11-27 21:16:32 +00:00
return nil
}
// processUncachedRequest processes requests that were not found in the cache.
func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWithHeader, error) {
var (
cacheDigest = getCacheDigest(r)
ip = util.ReadUserIP(r)
response *ResponseWithHeader
err error
)
// Response was not found in cache.
// Starting real handling.
format := r.URL.Query().Get("format")
if len(format) != 0 {
rp.stats.Inc("format")
if format == "j1" {
rp.stats.Inc("format=j1")
2022-12-04 15:55:14 +00:00
}
}
2022-12-04 15:55:14 +00:00
// Count, how many IP addresses are known.
_, err = rp.geoIPCache.Read(ip)
if err == nil {
rp.stats.Inc("geoip")
}
2022-11-27 21:16:32 +00:00
// Indicate, that the request is being handled.
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
response, err = get(r, rp.upstreamTransport)
if err != nil {
return nil, err
}
if response.StatusCode == 200 || response.StatusCode == 304 || response.StatusCode == 404 {
rp.lruCache.Add(cacheDigest, *response)
} else {
log.Printf("REMOVE: %d response for %s from cache\n", response.StatusCode, cacheDigest)
rp.lruCache.Remove(cacheDigest)
2020-05-30 16:14:17 +00:00
}
2022-12-11 13:28:34 +00:00
return response, nil
2020-05-30 16:14:17 +00:00
}
func get(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
client := &http.Client{
Transport: transport,
}
2020-05-30 16:14:17 +00:00
queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body)
if err != nil {
return nil, err
2020-05-30 16:14:17 +00:00
}
// proxyReq.Header.Set("Host", req.Host)
// proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
for header, values := range req.Header {
for _, value := range values {
proxyReq.Header.Add(header, value)
}
}
2022-11-20 09:03:31 +00:00
if proxyReq.Header.Get("X-Forwarded-For") == "" {
proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr))
}
2020-05-30 16:14:17 +00:00
res, err := client.Do(proxyReq)
if err != nil {
return nil, err
2020-05-30 16:14:17 +00:00
}
2022-12-11 13:28:34 +00:00
defer res.Body.Close()
2020-05-30 16:14:17 +00:00
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
2020-05-30 16:14:17 +00:00
}
return &ResponseWithHeader{
2020-05-30 16:14:17 +00:00
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: body,
Header: res.Header,
StatusCode: res.StatusCode,
}, nil
2020-05-30 16:14:17 +00:00
}
2022-12-11 13:28:34 +00:00
// getCacheDigest is an implementation of the cache.get_signature of original wttr.in.
2020-05-30 16:14:17 +00:00
func getCacheDigest(req *http.Request) string {
userAgent := req.Header.Get("User-Agent")
queryHost := req.Host
queryString := req.RequestURI
2022-12-03 15:08:43 +00:00
clientIPAddress := util.ReadUserIP(req)
2020-05-30 16:14:17 +00:00
lang := req.Header.Get("Accept-Language")
return fmt.Sprintf("%s:%s%s:%s:%s", userAgent, queryHost, queryString, clientIPAddress, lang)
}
2022-12-11 13:28:34 +00:00
// dontCache returns true if req should not be cached.
2020-06-08 05:26:38 +00:00
func dontCache(req *http.Request) bool {
// dont cache cyclic requests
loc := strings.Split(req.RequestURI, "?")[0]
2022-12-11 13:28:34 +00:00
return strings.Contains(loc, ":")
}
// redirectInsecure returns redirection response, and bool value, if redirection was needed,
// if the query comes from a browser, and it is insecure.
//
// Insecure queries are marked by the frontend web server
// with X-Forwarded-Proto header:
2022-12-11 13:28:34 +00:00
// `proxy_set_header X-Forwarded-Proto $scheme;`.
func redirectInsecure(req *http.Request) (*ResponseWithHeader, bool) {
if isPlainTextAgent(req.Header.Get("User-Agent")) {
return nil, false
}
if req.TLS != nil || strings.ToLower(req.Header.Get("X-Forwarded-Proto")) == "https" {
return nil, false
}
target := "https://" + req.Host + req.URL.Path
if len(req.URL.RawQuery) > 0 {
target += "?" + req.URL.RawQuery
}
body := []byte(fmt.Sprintf(`<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">
<TITLE>301 Moved</TITLE></HEAD><BODY>
<H1>301 Moved</H1>
The document has moved
<A HREF="%s">here</A>.
</BODY></HTML>
`, target))
return &ResponseWithHeader{
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: body,
Header: http.Header{"Location": []string{target}},
StatusCode: 301,
}, true
}
2022-12-11 13:28:34 +00:00
// isPlainTextAgent returns true if userAgent is a plain-text agent.
func isPlainTextAgent(userAgent string) bool {
userAgentLower := strings.ToLower(userAgent)
2022-12-11 13:28:34 +00:00
for _, signature := range plainTextAgents() {
if strings.Contains(userAgentLower, signature) {
return true
}
2020-06-08 05:26:38 +00:00
}
2022-12-11 13:28:34 +00:00
2020-06-08 05:26:38 +00:00
return false
}
2020-05-30 16:14:17 +00:00
func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}
2022-11-20 09:03:31 +00:00
// ipFromAddr returns IP address from a ADDR:PORT pair.
func ipFromAddr(s string) string {
pos := strings.LastIndex(s, ":")
if pos == -1 {
return s
}
2022-12-11 13:28:34 +00:00
2022-11-20 09:03:31 +00:00
return s[:pos]
}
2022-11-29 20:24:53 +00:00
// fromCadre converts Cadre into a responseWithHeader.
func fromCadre(cadre *routing.Cadre) *ResponseWithHeader {
return &ResponseWithHeader{
2022-11-29 20:24:53 +00:00
Body: cadre.Body,
Expires: cadre.Expires,
StatusCode: 200,
InProgress: false,
}
}