Distribute load between three backend servers

This commit is contained in:
Igor Chubin 2024-01-05 21:50:53 +01:00
parent ccd7360ece
commit b1d4ddd6d1
3 changed files with 54 additions and 31 deletions

View file

@ -50,8 +50,17 @@ type Server struct {
// Uplink configuration.
type Uplink struct {
// Address contains address of the uplink server in form IP:PORT.
Address string `yaml:"address,omitempty"`
// Address1 contains address of the uplink server in form IP:PORT
// for format=j1 queries.
Address1 string `yaml:"address1,omitempty"`
// Address2 contains address of the uplink server in form IP:PORT
// for format=* queries.
Address2 string `yaml:"address2,omitempty"`
// Address3 contains address of the uplink server in form IP:PORT
// for all other queries.
Address3 string `yaml:"address3,omitempty"`
// Timeout for upstream queries.
Timeout int `yaml:"timeout,omitempty"`
@ -85,7 +94,7 @@ type Geo struct {
LocationCacheType types.CacheType `yaml:"locationCacheType,omitempty"`
Nominatim []Nominatim
Nominatim []Nominatim `yaml:"nominatim"`
}
type Nominatim struct {
@ -140,7 +149,9 @@ func Default() *Config {
TLSKeyFile: "/wttr.in/etc/privkey.pem",
},
Uplink{
Address: "127.0.0.1:9002",
Address1: "127.0.0.1:9002",
Address2: "127.0.0.1:9002",
Address3: "127.0.0.1:9002",
Timeout: 30,
PrefetchInterval: 300,
},

View file

@ -10,12 +10,13 @@ import (
"strconv"
"strings"
"github.com/samonzeweb/godb"
"github.com/samonzeweb/godb/adapters/sqlite"
"github.com/chubin/wttr.in/internal/config"
"github.com/chubin/wttr.in/internal/routing"
"github.com/chubin/wttr.in/internal/types"
"github.com/chubin/wttr.in/internal/util"
"github.com/samonzeweb/godb"
"github.com/samonzeweb/godb/adapters/sqlite"
)
// Address information.

View file

@ -3,7 +3,6 @@ package processor
import (
"context"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
@ -52,15 +51,17 @@ type ResponseWithHeader struct {
// RequestProcessor handles incoming requests.
type RequestProcessor struct {
peakRequest30 sync.Map
peakRequest60 sync.Map
lruCache *lru.Cache
stats *stats.Stats
router routing.Router
upstreamTransport *http.Transport
config *config.Config
geoIPCache *geoip.Cache
geoLocation *geoloc.Cache
peakRequest30 sync.Map
peakRequest60 sync.Map
lruCache *lru.Cache
stats *stats.Stats
router routing.Router
upstreamTransport1 *http.Transport
upstreamTransport2 *http.Transport
upstreamTransport3 *http.Transport
config *config.Config
geoIPCache *geoip.Cache
geoLocation *geoloc.Cache
}
// NewRequestProcessor returns new RequestProcessor.
@ -76,9 +77,19 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
DualStack: true,
}
transport := &http.Transport{
transport1 := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, config.Uplink.Address)
return dialer.DialContext(ctx, network, config.Uplink.Address1)
},
}
transport2 := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, config.Uplink.Address2)
},
}
transport3 := &http.Transport{
DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, config.Uplink.Address3)
},
}
@ -93,12 +104,14 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) {
}
rp := &RequestProcessor{
lruCache: lruCache,
stats: stats.New(),
upstreamTransport: transport,
config: config,
geoIPCache: geoCache,
geoLocation: geoLocation,
lruCache: lruCache,
stats: stats.New(),
upstreamTransport1: transport1,
upstreamTransport2: transport2,
upstreamTransport3: transport3,
config: config,
geoIPCache: geoCache,
geoLocation: geoLocation,
}
// Initialize routes.
@ -142,7 +155,7 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader
if dontCache(r) {
rp.stats.Inc("uncached")
return get(r, rp.upstreamTransport)
return getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3)
}
// processing cached request
@ -173,8 +186,9 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi
return nil
}
// if after all attempts we still have no answer,
// we try to make the query on our own
// If after all attempts we still have no answer,
// respond with an error message.
// (WAS: we try to make the query on our own)
for attempts := 0; attempts < 300; attempts++ {
if !ok || !cacheEntry.InProgress {
break
@ -223,10 +237,7 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi
rp.stats.Inc("geoip")
}
// Indicate, that the request is being handled.
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
response, err = get(r, rp.upstreamTransport)
response, err = getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3)
if err != nil {
return nil, err
}