mirror of
https://github.com/chubin/wttr.in
synced 2024-11-14 16:17:19 +00:00
Merge branch 'master' of https://github.com/chubin/wttr.in
This commit is contained in:
commit
6a223e9d25
5 changed files with 158 additions and 77 deletions
|
@ -50,8 +50,17 @@ type Server struct {
|
|||
|
||||
// Uplink configuration.
|
||||
type Uplink struct {
|
||||
// Address contains address of the uplink server in form IP:PORT.
|
||||
Address string `yaml:"address,omitempty"`
|
||||
// Address1 contains address of the uplink server in form IP:PORT
|
||||
// for format=j1 queries.
|
||||
Address1 string `yaml:"address1,omitempty"`
|
||||
|
||||
// Address2 contains address of the uplink server in form IP:PORT
|
||||
// for format=* queries.
|
||||
Address2 string `yaml:"address2,omitempty"`
|
||||
|
||||
// Address3 contains address of the uplink server in form IP:PORT
|
||||
// for all other queries.
|
||||
Address3 string `yaml:"address3,omitempty"`
|
||||
|
||||
// Timeout for upstream queries.
|
||||
Timeout int `yaml:"timeout,omitempty"`
|
||||
|
@ -85,7 +94,7 @@ type Geo struct {
|
|||
|
||||
LocationCacheType types.CacheType `yaml:"locationCacheType,omitempty"`
|
||||
|
||||
Nominatim []Nominatim
|
||||
Nominatim []Nominatim `yaml:"nominatim"`
|
||||
}
|
||||
|
||||
type Nominatim struct {
|
||||
|
@ -140,7 +149,9 @@ func Default() *Config {
|
|||
TLSKeyFile: "/wttr.in/etc/privkey.pem",
|
||||
},
|
||||
Uplink{
|
||||
Address: "127.0.0.1:9002",
|
||||
Address1: "127.0.0.1:9002",
|
||||
Address2: "127.0.0.1:9002",
|
||||
Address3: "127.0.0.1:9002",
|
||||
Timeout: 30,
|
||||
PrefetchInterval: 300,
|
||||
},
|
||||
|
|
|
@ -10,12 +10,13 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/samonzeweb/godb"
|
||||
"github.com/samonzeweb/godb/adapters/sqlite"
|
||||
|
||||
"github.com/chubin/wttr.in/internal/config"
|
||||
"github.com/chubin/wttr.in/internal/routing"
|
||||
"github.com/chubin/wttr.in/internal/types"
|
||||
"github.com/chubin/wttr.in/internal/util"
|
||||
"github.com/samonzeweb/godb"
|
||||
"github.com/samonzeweb/godb/adapters/sqlite"
|
||||
)
|
||||
|
||||
// Address information.
|
||||
|
|
|
@ -56,6 +56,11 @@ func (rl *RequestLogger) Log(r *http.Request) error {
|
|||
le.Proto = "https"
|
||||
}
|
||||
|
||||
// Do not log 127.0.0.1 connections
|
||||
if le.IP == "127.0.0.1" {
|
||||
return nil
|
||||
}
|
||||
|
||||
rl.m.Lock()
|
||||
rl.buf[le]++
|
||||
rl.m.Unlock()
|
||||
|
|
89
internal/processor/j1.go
Normal file
89
internal/processor/j1.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func getAny(req *http.Request, tr1, tr2, tr3 *http.Transport) (*ResponseWithHeader, error) {
|
||||
uri := strings.ReplaceAll(req.URL.RequestURI(), "%", "%25")
|
||||
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
format := u.Query().Get("format")
|
||||
|
||||
if format == "j1" {
|
||||
return getJ1(req, tr1)
|
||||
} else if format != "" {
|
||||
return getFormat(req, tr2)
|
||||
}
|
||||
|
||||
// log.Println(req.URL.Query())
|
||||
// log.Println()
|
||||
|
||||
return getDefault(req, tr3)
|
||||
}
|
||||
|
||||
func getJ1(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
|
||||
return getUpstream(req, transport)
|
||||
}
|
||||
|
||||
func getFormat(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
|
||||
return getUpstream(req, transport)
|
||||
}
|
||||
|
||||
func getDefault(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
|
||||
return getUpstream(req, transport)
|
||||
}
|
||||
|
||||
func getUpstream(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
|
||||
|
||||
proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// proxyReq.Header.Set("Host", req.Host)
|
||||
// proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
|
||||
|
||||
for header, values := range req.Header {
|
||||
for _, value := range values {
|
||||
proxyReq.Header.Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
if proxyReq.Header.Get("X-Forwarded-For") == "" {
|
||||
proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr))
|
||||
}
|
||||
|
||||
res, err := client.Do(proxyReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ResponseWithHeader{
|
||||
InProgress: false,
|
||||
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
|
||||
Body: body,
|
||||
Header: res.Header,
|
||||
StatusCode: res.StatusCode,
|
||||
}, nil
|
||||
}
|
|
@ -3,7 +3,6 @@ package processor
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
|
@ -52,15 +51,17 @@ type ResponseWithHeader struct {
|
|||
|
||||
// RequestProcessor handles incoming requests.
|
||||
type RequestProcessor struct {
|
||||
peakRequest30 sync.Map
|
||||
peakRequest60 sync.Map
|
||||
lruCache *lru.Cache
|
||||
stats *stats.Stats
|
||||
router routing.Router
|
||||
upstreamTransport *http.Transport
|
||||
config *config.Config
|
||||
geoIPCache *geoip.Cache
|
||||
geoLocation *geoloc.Cache
|
||||
peakRequest30 sync.Map
|
||||
peakRequest60 sync.Map
|
||||
lruCache *lru.Cache
|
||||
stats *stats.Stats
|
||||
router routing.Router
|
||||
upstreamTransport1 *http.Transport
|
||||
upstreamTransport2 *http.Transport
|
||||
upstreamTransport3 *http.Transport
|
||||
config *config.Config
|
||||
geoIPCache *geoip.Cache
|
||||
geoLocation *geoloc.Cache
|
||||
}
|
||||
|
||||
// NewRequestProcessor returns new RequestProcessor.
|
||||
|
@ -76,9 +77,19 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
|
|||
DualStack: true,
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
transport1 := &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
|
||||
return dialer.DialContext(ctx, network, config.Uplink.Address)
|
||||
return dialer.DialContext(ctx, network, config.Uplink.Address1)
|
||||
},
|
||||
}
|
||||
transport2 := &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
|
||||
return dialer.DialContext(ctx, network, config.Uplink.Address2)
|
||||
},
|
||||
}
|
||||
transport3 := &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
|
||||
return dialer.DialContext(ctx, network, config.Uplink.Address3)
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -93,12 +104,14 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
|
|||
}
|
||||
|
||||
rp := &RequestProcessor{
|
||||
lruCache: lruCache,
|
||||
stats: stats.New(),
|
||||
upstreamTransport: transport,
|
||||
config: config,
|
||||
geoIPCache: geoCache,
|
||||
geoLocation: geoLocation,
|
||||
lruCache: lruCache,
|
||||
stats: stats.New(),
|
||||
upstreamTransport1: transport1,
|
||||
upstreamTransport2: transport2,
|
||||
upstreamTransport3: transport3,
|
||||
config: config,
|
||||
geoIPCache: geoCache,
|
||||
geoLocation: geoLocation,
|
||||
}
|
||||
|
||||
// Initialize routes.
|
||||
|
@ -142,7 +155,7 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader
|
|||
if dontCache(r) {
|
||||
rp.stats.Inc("uncached")
|
||||
|
||||
return get(r, rp.upstreamTransport)
|
||||
return getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3)
|
||||
}
|
||||
|
||||
// processing cached request
|
||||
|
@ -173,8 +186,9 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi
|
|||
return nil
|
||||
}
|
||||
|
||||
// if after all attempts we still have no answer,
|
||||
// we try to make the query on our own
|
||||
// If after all attempts we still have no answer,
|
||||
// respond with an error message.
|
||||
// (WAS: we try to make the query on our own)
|
||||
for attempts := 0; attempts < 300; attempts++ {
|
||||
if !ok || !cacheEntry.InProgress {
|
||||
break
|
||||
|
@ -187,7 +201,13 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi
|
|||
}
|
||||
}
|
||||
if cacheEntry.InProgress {
|
||||
log.Printf("TIMEOUT: %s\n", cacheDigest)
|
||||
// log.Printf("TIMEOUT: %s\n", cacheDigest)
|
||||
return &ResponseWithHeader{
|
||||
InProgress: false,
|
||||
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
|
||||
Body: []byte("This query is already being processed"),
|
||||
StatusCode: 200,
|
||||
}
|
||||
}
|
||||
if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) {
|
||||
rp.stats.Inc("cache1")
|
||||
|
@ -207,6 +227,9 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
|
|||
err error
|
||||
)
|
||||
|
||||
// Indicate, that the request is being handled.
|
||||
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
|
||||
|
||||
// Response was not found in cache.
|
||||
// Starting real handling.
|
||||
format := r.URL.Query().Get("format")
|
||||
|
@ -223,10 +246,7 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
|
|||
rp.stats.Inc("geoip")
|
||||
}
|
||||
|
||||
// Indicate, that the request is being handled.
|
||||
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
|
||||
|
||||
response, err = get(r, rp.upstreamTransport)
|
||||
response, err = getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -240,51 +260,6 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
|
|||
return response, nil
|
||||
}
|
||||
|
||||
func get(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) {
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
|
||||
|
||||
proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// proxyReq.Header.Set("Host", req.Host)
|
||||
// proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
|
||||
|
||||
for header, values := range req.Header {
|
||||
for _, value := range values {
|
||||
proxyReq.Header.Add(header, value)
|
||||
}
|
||||
}
|
||||
|
||||
if proxyReq.Header.Get("X-Forwarded-For") == "" {
|
||||
proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr))
|
||||
}
|
||||
|
||||
res, err := client.Do(proxyReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ResponseWithHeader{
|
||||
InProgress: false,
|
||||
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
|
||||
Body: body,
|
||||
Header: res.Header,
|
||||
StatusCode: res.StatusCode,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// getCacheDigest is an implementation of the cache.get_signature of original wttr.in.
|
||||
func getCacheDigest(req *http.Request) string {
|
||||
userAgent := req.Header.Get("User-Agent")
|
||||
|
|
Loading…
Reference in a new issue