Merge branch 'chubin:master' into master

This commit is contained in:
ArshA 2024-03-03 13:54:32 +03:30 committed by GitHub
commit 104c9a5730
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 160 additions and 79 deletions

View file

@ -50,8 +50,17 @@ type Server struct {
// Uplink configuration. // Uplink configuration.
type Uplink struct { type Uplink struct {
// Address contains address of the uplink server in form IP:PORT. // Address1 contains address of the uplink server in form IP:PORT
Address string `yaml:"address,omitempty"` // for format=j1 queries.
Address1 string `yaml:"address1,omitempty"`
// Address2 contains address of the uplink server in form IP:PORT
// for format=* queries.
Address2 string `yaml:"address2,omitempty"`
// Address3 contains address of the uplink server in form IP:PORT
// for all other queries.
Address3 string `yaml:"address3,omitempty"`
// Timeout for upstream queries. // Timeout for upstream queries.
Timeout int `yaml:"timeout,omitempty"` Timeout int `yaml:"timeout,omitempty"`
@ -85,7 +94,7 @@ type Geo struct {
LocationCacheType types.CacheType `yaml:"locationCacheType,omitempty"` LocationCacheType types.CacheType `yaml:"locationCacheType,omitempty"`
Nominatim []Nominatim Nominatim []Nominatim `yaml:"nominatim"`
} }
type Nominatim struct { type Nominatim struct {
@ -140,7 +149,9 @@ func Default() *Config {
TLSKeyFile: "/wttr.in/etc/privkey.pem", TLSKeyFile: "/wttr.in/etc/privkey.pem",
}, },
Uplink{ Uplink{
Address: "127.0.0.1:9002", Address1: "127.0.0.1:9002",
Address2: "127.0.0.1:9002",
Address3: "127.0.0.1:9002",
Timeout: 30, Timeout: 30,
PrefetchInterval: 300, PrefetchInterval: 300,
}, },

View file

@ -10,12 +10,13 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/samonzeweb/godb"
"github.com/samonzeweb/godb/adapters/sqlite"
"github.com/chubin/wttr.in/internal/config" "github.com/chubin/wttr.in/internal/config"
"github.com/chubin/wttr.in/internal/routing" "github.com/chubin/wttr.in/internal/routing"
"github.com/chubin/wttr.in/internal/types" "github.com/chubin/wttr.in/internal/types"
"github.com/chubin/wttr.in/internal/util" "github.com/chubin/wttr.in/internal/util"
"github.com/samonzeweb/godb"
"github.com/samonzeweb/godb/adapters/sqlite"
) )
// Address information. // Address information.

View file

@ -56,6 +56,11 @@ func (rl *RequestLogger) Log(r *http.Request) error {
le.Proto = "https" le.Proto = "https"
} }
// Do not log 127.0.0.1 connections
if le.IP == "127.0.0.1" {
return nil
}
rl.m.Lock() rl.m.Lock()
rl.buf[le]++ rl.buf[le]++
rl.m.Unlock() rl.m.Unlock()

89
internal/processor/j1.go Normal file
View file

@ -0,0 +1,89 @@
package processor
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
)
func getAny(req *http.Request, tr1, tr2, tr3 *http.Transport) (*ResponseWithHeader, error) {
uri := strings.ReplaceAll(req.URL.RequestURI(), "%", "%25")
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
format := u.Query().Get("format")
if format == "j1" {
return getJ1(req, tr1)
} else if format != "" {
return getFormat(req, tr2)
}
// log.Println(req.URL.Query())
// log.Println()
return getDefault(req, tr3)
}
func getJ1(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
return getUpstream(req, transport)
}
func getFormat(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
return getUpstream(req, transport)
}
func getDefault(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
return getUpstream(req, transport)
}
func getUpstream(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
client := &http.Client{
Transport: transport,
}
queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body)
if err != nil {
return nil, err
}
// proxyReq.Header.Set("Host", req.Host)
// proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
for header, values := range req.Header {
for _, value := range values {
proxyReq.Header.Add(header, value)
}
}
if proxyReq.Header.Get("X-Forwarded-For") == "" {
proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr))
}
res, err := client.Do(proxyReq)
if err != nil {
return nil, err
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return &ResponseWithHeader{
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: body,
Header: res.Header,
StatusCode: res.StatusCode,
}, nil
}

View file

@ -3,7 +3,6 @@ package processor
import ( import (
"context" "context"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"math/rand" "math/rand"
"net" "net"
@ -52,15 +51,17 @@ type ResponseWithHeader struct {
// RequestProcessor handles incoming requests. // RequestProcessor handles incoming requests.
type RequestProcessor struct { type RequestProcessor struct {
peakRequest30 sync.Map peakRequest30 sync.Map
peakRequest60 sync.Map peakRequest60 sync.Map
lruCache *lru.Cache lruCache *lru.Cache
stats *stats.Stats stats *stats.Stats
router routing.Router router routing.Router
upstreamTransport *http.Transport upstreamTransport1 *http.Transport
config *config.Config upstreamTransport2 *http.Transport
geoIPCache *geoip.Cache upstreamTransport3 *http.Transport
geoLocation *geoloc.Cache config *config.Config
geoIPCache *geoip.Cache
geoLocation *geoloc.Cache
} }
// NewRequestProcessor returns new RequestProcessor. // NewRequestProcessor returns new RequestProcessor.
@ -76,9 +77,19 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
DualStack: true, DualStack: true,
} }
transport := &http.Transport{ transport1 := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, config.Uplink.Address) return dialer.DialContext(ctx, network, config.Uplink.Address1)
},
}
transport2 := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, config.Uplink.Address2)
},
}
transport3 := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, config.Uplink.Address3)
}, },
} }
@ -93,12 +104,14 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
} }
rp := &RequestProcessor{ rp := &RequestProcessor{
lruCache: lruCache, lruCache: lruCache,
stats: stats.New(), stats: stats.New(),
upstreamTransport: transport, upstreamTransport1: transport1,
config: config, upstreamTransport2: transport2,
geoIPCache: geoCache, upstreamTransport3: transport3,
geoLocation: geoLocation, config: config,
geoIPCache: geoCache,
geoLocation: geoLocation,
} }
// Initialize routes. // Initialize routes.
@ -142,7 +155,7 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader
if dontCache(r) { if dontCache(r) {
rp.stats.Inc("uncached") rp.stats.Inc("uncached")
return get(r, rp.upstreamTransport) return getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3)
} }
// processing cached request // processing cached request
@ -173,8 +186,9 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi
return nil return nil
} }
// if after all attempts we still have no answer, // If after all attempts we still have no answer,
// we try to make the query on our own // respond with an error message.
// (WAS: we try to make the query on our own)
for attempts := 0; attempts < 300; attempts++ { for attempts := 0; attempts < 300; attempts++ {
if !ok || !cacheEntry.InProgress { if !ok || !cacheEntry.InProgress {
break break
@ -187,7 +201,13 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi
} }
} }
if cacheEntry.InProgress { if cacheEntry.InProgress {
log.Printf("TIMEOUT: %s\n", cacheDigest) // log.Printf("TIMEOUT: %s\n", cacheDigest)
return &ResponseWithHeader{
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: []byte("This query is already being processed"),
StatusCode: 200,
}
} }
if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) { if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) {
rp.stats.Inc("cache1") rp.stats.Inc("cache1")
@ -207,6 +227,9 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
err error err error
) )
// Indicate, that the request is being handled.
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
// Response was not found in cache. // Response was not found in cache.
// Starting real handling. // Starting real handling.
format := r.URL.Query().Get("format") format := r.URL.Query().Get("format")
@ -223,10 +246,7 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
rp.stats.Inc("geoip") rp.stats.Inc("geoip")
} }
// Indicate, that the request is being handled. response, err = getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3)
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
response, err = get(r, rp.upstreamTransport)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -240,51 +260,6 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
return response, nil return response, nil
} }
func get(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
client := &http.Client{
Transport: transport,
}
queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body)
if err != nil {
return nil, err
}
// proxyReq.Header.Set("Host", req.Host)
// proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
for header, values := range req.Header {
for _, value := range values {
proxyReq.Header.Add(header, value)
}
}
if proxyReq.Header.Get("X-Forwarded-For") == "" {
proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr))
}
res, err := client.Do(proxyReq)
if err != nil {
return nil, err
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
return &ResponseWithHeader{
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: body,
Header: res.Header,
StatusCode: res.StatusCode,
}, nil
}
// getCacheDigest is an implementation of the cache.get_signature of original wttr.in. // getCacheDigest is an implementation of the cache.get_signature of original wttr.in.
func getCacheDigest(req *http.Request) string { func getCacheDigest(req *http.Request) string {
userAgent := req.Header.Get("User-Agent") userAgent := req.Header.Get("User-Agent")

View file

@ -102,7 +102,7 @@ def _geolocator(location):
if random.random() < 0: if random.random() < 0:
geo = requests.get('%s/%s' % (GEOLOCATOR_SERVICE, location)).text geo = requests.get('%s/%s' % (GEOLOCATOR_SERVICE, location)).text
else: else:
geo = requests.get("http://127.0.0.1:8083/:geo-location?location=%s" % location).text geo = requests.get("http://127.0.0.1:8085/:geo-location?location=%s" % location).text
except requests.exceptions.ConnectionError as exception: except requests.exceptions.ConnectionError as exception:
print("ERROR: %s" % exception) print("ERROR: %s" % exception)
return None return None
@ -152,7 +152,7 @@ def _ipcache(ip_addr):
""" """
## Use Geo IP service when available ## Use Geo IP service when available
r = requests.get("http://127.0.0.1:8083/:geo-ip-get?ip=%s" % ip_addr) r = requests.get("http://127.0.0.1:8085/:geo-ip-get?ip=%s" % ip_addr)
if r.status_code == 200 and ";" in r.text: if r.status_code == 200 and ";" in r.text:
_, country, region, city, *_ = r.text.split(';') _, country, region, city, *_ = r.text.split(';')
return city, region, country return city, region, country