More flexible caching (#1101)

This commit is contained in:
Neil Alexander 2020-06-05 16:42:01 +01:00 committed by GitHub
parent 76ff47c052
commit e7b19d2c70
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 189 additions and 142 deletions

View file

@ -255,7 +255,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
panic(err) panic(err)
} }
cache, err := caching.NewImmutableInMemoryLRUCache() cache, err := caching.NewInMemoryLRUCache()
if err != nil { if err != nil {
panic(err) panic(err)
} }

View file

@ -66,7 +66,7 @@ type BaseDendrite struct {
UseHTTPAPIs bool UseHTTPAPIs bool
httpClient *http.Client httpClient *http.Client
Cfg *config.Dendrite Cfg *config.Dendrite
ImmutableCache caching.ImmutableCache Caches *caching.Caches
KafkaConsumer sarama.Consumer KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer KafkaProducer sarama.SyncProducer
} }
@ -95,7 +95,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
kafkaConsumer, kafkaProducer = setupKafka(cfg) kafkaConsumer, kafkaProducer = setupKafka(cfg)
} }
cache, err := caching.NewImmutableInMemoryLRUCache() cache, err := caching.NewInMemoryLRUCache()
if err != nil { if err != nil {
logrus.WithError(err).Warnf("Failed to create cache") logrus.WithError(err).Warnf("Failed to create cache")
} }
@ -126,7 +126,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
UseHTTPAPIs: useHTTPAPIs, UseHTTPAPIs: useHTTPAPIs,
tracerCloser: closer, tracerCloser: closer,
Cfg: cfg, Cfg: cfg,
ImmutableCache: cache, Caches: cache,
PublicAPIMux: httpmux.PathPrefix(httpapis.PublicPathPrefix).Subrouter().UseEncodedPath(), PublicAPIMux: httpmux.PathPrefix(httpapis.PublicPathPrefix).Subrouter().UseEncodedPath(),
InternalAPIMux: httpmux.PathPrefix(httpapis.InternalPathPrefix).Subrouter().UseEncodedPath(), InternalAPIMux: httpmux.PathPrefix(httpapis.InternalPathPrefix).Subrouter().UseEncodedPath(),
httpClient: &client, httpClient: &client,
@ -151,7 +151,7 @@ func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI {
// RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP. // RoomserverHTTPClient returns RoomserverInternalAPI for hitting the roomserver over HTTP.
func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI { func (b *BaseDendrite) RoomserverHTTPClient() roomserverAPI.RoomserverInternalAPI {
rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.ImmutableCache) rsAPI, err := rsinthttp.NewRoomserverClient(b.Cfg.RoomServerURL(), b.httpClient, b.Caches)
if err != nil { if err != nil {
logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.httpClient) logrus.WithError(err).Panic("RoomserverHTTPClient failed", b.httpClient)
} }
@ -182,7 +182,7 @@ func (b *BaseDendrite) ServerKeyAPIClient() serverKeyAPI.ServerKeyInternalAPI {
f, err := skinthttp.NewServerKeyClient( f, err := skinthttp.NewServerKeyClient(
b.Cfg.ServerKeyAPIURL(), b.Cfg.ServerKeyAPIURL(),
b.httpClient, b.httpClient,
b.ImmutableCache, b.Caches,
) )
if err != nil { if err != nil {
logrus.WithError(err).Panic("NewServerKeyInternalAPIHTTP failed", b.httpClient) logrus.WithError(err).Panic("NewServerKeyInternalAPIHTTP failed", b.httpClient)

View file

@ -0,0 +1,30 @@
package caching
import "github.com/matrix-org/gomatrixserverlib"
const (
RoomVersionCacheName = "room_versions"
RoomVersionCacheMaxEntries = 1024
RoomVersionCacheMutable = false
)
// RoomVersionsCache contains the subset of functions needed for
// a room version cache.
type RoomVersionCache interface {
GetRoomVersion(roomID string) (roomVersion gomatrixserverlib.RoomVersion, ok bool)
StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion)
}
func (c Caches) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) {
val, found := c.RoomVersions.Get(roomID)
if found && val != nil {
if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok {
return roomVersion, true
}
}
return "", false
}
func (c Caches) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) {
c.RoomVersions.Set(roomID, roomVersion)
}

View file

@ -0,0 +1,41 @@
package caching
import (
"fmt"
"github.com/matrix-org/gomatrixserverlib"
)
const (
ServerKeyCacheName = "server_key"
ServerKeyCacheMaxEntries = 4096
ServerKeyCacheMutable = true
)
// ServerKeyCache contains the subset of functions needed for
// a server key cache.
type ServerKeyCache interface {
GetServerKey(request gomatrixserverlib.PublicKeyLookupRequest) (response gomatrixserverlib.PublicKeyLookupResult, ok bool)
StoreServerKey(request gomatrixserverlib.PublicKeyLookupRequest, response gomatrixserverlib.PublicKeyLookupResult)
}
func (c Caches) GetServerKey(
request gomatrixserverlib.PublicKeyLookupRequest,
) (gomatrixserverlib.PublicKeyLookupResult, bool) {
key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID)
val, found := c.ServerKeys.Get(key)
if found && val != nil {
if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok {
return keyLookupResult, true
}
}
return gomatrixserverlib.PublicKeyLookupResult{}, false
}
func (c Caches) StoreServerKey(
request gomatrixserverlib.PublicKeyLookupRequest,
response gomatrixserverlib.PublicKeyLookupResult,
) {
key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID)
c.ServerKeys.Set(key, response)
}

View file

@ -0,0 +1,15 @@
package caching
// Caches contains a set of references to caches. They may be
// different implementations as long as they satisfy the Cache
// interface.
type Caches struct {
RoomVersions Cache // implements RoomVersionCache
ServerKeys Cache // implements ServerKeyCache
}
// Cache is the interface that an implementation must satisfy.
type Cache interface {
Get(key string) (value interface{}, ok bool)
Set(key string, value interface{})
}

View file

@ -1,17 +0,0 @@
package caching
import (
"github.com/matrix-org/gomatrixserverlib"
)
const (
RoomVersionMaxCacheEntries = 1024
ServerKeysMaxCacheEntries = 1024
)
type ImmutableCache interface {
GetRoomVersion(roomId string) (gomatrixserverlib.RoomVersion, bool)
StoreRoomVersion(roomId string, roomVersion gomatrixserverlib.RoomVersion)
GetServerKey(request gomatrixserverlib.PublicKeyLookupRequest) (gomatrixserverlib.PublicKeyLookupResult, bool)
StoreServerKey(request gomatrixserverlib.PublicKeyLookupRequest, response gomatrixserverlib.PublicKeyLookupResult)
}

View file

@ -1,95 +0,0 @@
package caching
import (
"fmt"
lru "github.com/hashicorp/golang-lru"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type ImmutableInMemoryLRUCache struct {
roomVersions *lru.Cache
serverKeys *lru.Cache
}
func NewImmutableInMemoryLRUCache() (*ImmutableInMemoryLRUCache, error) {
roomVersionCache, rvErr := lru.New(RoomVersionMaxCacheEntries)
if rvErr != nil {
return nil, rvErr
}
serverKeysCache, rvErr := lru.New(ServerKeysMaxCacheEntries)
if rvErr != nil {
return nil, rvErr
}
cache := &ImmutableInMemoryLRUCache{
roomVersions: roomVersionCache,
serverKeys: serverKeysCache,
}
cache.configureMetrics()
return cache, nil
}
func (c *ImmutableInMemoryLRUCache) configureMetrics() {
promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "caching",
Name: "number_room_version_entries",
Help: "The number of room version entries cached.",
}, func() float64 {
return float64(c.roomVersions.Len())
})
promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "caching",
Name: "number_server_key_entries",
Help: "The number of server key entries cached.",
}, func() float64 {
return float64(c.serverKeys.Len())
})
}
func checkForInvalidMutation(cache *lru.Cache, key string, value interface{}) {
if peek, ok := cache.Peek(key); ok && peek != value {
panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key))
}
}
func (c *ImmutableInMemoryLRUCache) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) {
val, found := c.roomVersions.Get(roomID)
if found && val != nil {
if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok {
return roomVersion, true
}
}
return "", false
}
func (c *ImmutableInMemoryLRUCache) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) {
checkForInvalidMutation(c.roomVersions, roomID, roomVersion)
c.roomVersions.Add(roomID, roomVersion)
}
func (c *ImmutableInMemoryLRUCache) GetServerKey(
request gomatrixserverlib.PublicKeyLookupRequest,
) (gomatrixserverlib.PublicKeyLookupResult, bool) {
key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID)
val, found := c.serverKeys.Get(key)
if found && val != nil {
if keyLookupResult, ok := val.(gomatrixserverlib.PublicKeyLookupResult); ok {
return keyLookupResult, true
}
}
return gomatrixserverlib.PublicKeyLookupResult{}, false
}
func (c *ImmutableInMemoryLRUCache) StoreServerKey(
request gomatrixserverlib.PublicKeyLookupRequest,
response gomatrixserverlib.PublicKeyLookupResult,
) {
key := fmt.Sprintf("%s/%s", request.ServerName, request.KeyID)
checkForInvalidMutation(c.roomVersions, key, response)
c.serverKeys.Add(request, response)
}

View file

@ -0,0 +1,73 @@
package caching
import (
"fmt"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
func NewInMemoryLRUCache() (*Caches, error) {
roomVersions, err := NewInMemoryLRUCachePartition(
RoomVersionCacheName,
RoomVersionCacheMutable,
RoomVersionCacheMaxEntries,
)
if err != nil {
return nil, err
}
serverKeys, err := NewInMemoryLRUCachePartition(
ServerKeyCacheName,
ServerKeyCacheMutable,
ServerKeyCacheMaxEntries,
)
if err != nil {
return nil, err
}
return &Caches{
RoomVersions: roomVersions,
ServerKeys: serverKeys,
}, nil
}
type InMemoryLRUCachePartition struct {
name string
mutable bool
maxEntries int
lru *lru.Cache
}
func NewInMemoryLRUCachePartition(name string, mutable bool, maxEntries int) (*InMemoryLRUCachePartition, error) {
var err error
cache := InMemoryLRUCachePartition{
name: name,
mutable: mutable,
maxEntries: maxEntries,
}
cache.lru, err = lru.New(maxEntries)
if err != nil {
return nil, err
}
promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "caching_in_memory_lru",
Name: name,
}, func() float64 {
return float64(cache.lru.Len())
})
return &cache, nil
}
func (c *InMemoryLRUCachePartition) Set(key string, value interface{}) {
if !c.mutable {
if peek, ok := c.lru.Peek(key); ok && peek != value {
panic(fmt.Sprintf("invalid use of immutable cache tries to mutate existing value of %q", key))
}
}
c.lru.Add(key, value)
}
func (c *InMemoryLRUCachePartition) Get(key string) (value interface{}, ok bool) {
return c.lru.Get(key)
}

View file

@ -16,7 +16,7 @@ type RoomserverInternalAPI struct {
DB storage.Database DB storage.Database
Cfg *config.Dendrite Cfg *config.Dendrite
Producer sarama.SyncProducer Producer sarama.SyncProducer
ImmutableCache caching.ImmutableCache Cache caching.RoomVersionCache
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
FedClient *gomatrixserverlib.FederationClient FedClient *gomatrixserverlib.FederationClient

View file

@ -951,7 +951,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom(
request *api.QueryRoomVersionForRoomRequest, request *api.QueryRoomVersionForRoomRequest,
response *api.QueryRoomVersionForRoomResponse, response *api.QueryRoomVersionForRoomResponse,
) error { ) error {
if roomVersion, ok := r.ImmutableCache.GetRoomVersion(request.RoomID); ok { if roomVersion, ok := r.Cache.GetRoomVersion(request.RoomID); ok {
response.RoomVersion = roomVersion response.RoomVersion = roomVersion
return nil return nil
} }
@ -961,6 +961,6 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom(
return err return err
} }
response.RoomVersion = roomVersion response.RoomVersion = roomVersion
r.ImmutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion) r.Cache.StoreRoomVersion(request.RoomID, response.RoomVersion)
return nil return nil
} }

View file

@ -45,7 +45,7 @@ const (
type httpRoomserverInternalAPI struct { type httpRoomserverInternalAPI struct {
roomserverURL string roomserverURL string
httpClient *http.Client httpClient *http.Client
immutableCache caching.ImmutableCache cache caching.RoomVersionCache
} }
// NewRoomserverClient creates a RoomserverInputAPI implemented by talking to a HTTP POST API. // NewRoomserverClient creates a RoomserverInputAPI implemented by talking to a HTTP POST API.
@ -53,7 +53,7 @@ type httpRoomserverInternalAPI struct {
func NewRoomserverClient( func NewRoomserverClient(
roomserverURL string, roomserverURL string,
httpClient *http.Client, httpClient *http.Client,
immutableCache caching.ImmutableCache, cache caching.RoomVersionCache,
) (api.RoomserverInternalAPI, error) { ) (api.RoomserverInternalAPI, error) {
if httpClient == nil { if httpClient == nil {
return nil, errors.New("NewRoomserverInternalAPIHTTP: httpClient is <nil>") return nil, errors.New("NewRoomserverInternalAPIHTTP: httpClient is <nil>")
@ -61,7 +61,7 @@ func NewRoomserverClient(
return &httpRoomserverInternalAPI{ return &httpRoomserverInternalAPI{
roomserverURL: roomserverURL, roomserverURL: roomserverURL,
httpClient: httpClient, httpClient: httpClient,
immutableCache: immutableCache, cache: cache,
}, nil }, nil
} }
@ -320,7 +320,7 @@ func (h *httpRoomserverInternalAPI) QueryRoomVersionForRoom(
request *api.QueryRoomVersionForRoomRequest, request *api.QueryRoomVersionForRoomRequest,
response *api.QueryRoomVersionForRoomResponse, response *api.QueryRoomVersionForRoomResponse,
) error { ) error {
if roomVersion, ok := h.immutableCache.GetRoomVersion(request.RoomID); ok { if roomVersion, ok := h.cache.GetRoomVersion(request.RoomID); ok {
response.RoomVersion = roomVersion response.RoomVersion = roomVersion
return nil return nil
} }
@ -331,7 +331,7 @@ func (h *httpRoomserverInternalAPI) QueryRoomVersionForRoom(
apiURL := h.roomserverURL + RoomserverQueryRoomVersionForRoomPath apiURL := h.roomserverURL + RoomserverQueryRoomVersionForRoomPath
err := internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) err := internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err == nil { if err == nil {
h.immutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion) h.cache.StoreRoomVersion(request.RoomID, response.RoomVersion)
} }
return err return err
} }

View file

@ -44,7 +44,7 @@ func SetupRoomServerComponent(
Cfg: base.Cfg, Cfg: base.Cfg,
Producer: base.KafkaProducer, Producer: base.KafkaProducer,
OutputRoomEventTopic: string(base.Cfg.Kafka.Topics.OutputRoomEvent), OutputRoomEventTopic: string(base.Cfg.Kafka.Topics.OutputRoomEvent),
ImmutableCache: base.ImmutableCache, Cache: base.Caches,
ServerName: base.Cfg.Matrix.ServerName, ServerName: base.Cfg.Matrix.ServerName,
FedClient: fedClient, FedClient: fedClient,
KeyRing: keyRing, KeyRing: keyRing,

View file

@ -24,7 +24,7 @@ const (
func NewServerKeyClient( func NewServerKeyClient(
serverKeyAPIURL string, serverKeyAPIURL string,
httpClient *http.Client, httpClient *http.Client,
immutableCache caching.ImmutableCache, cache caching.ServerKeyCache,
) (api.ServerKeyInternalAPI, error) { ) (api.ServerKeyInternalAPI, error) {
if httpClient == nil { if httpClient == nil {
return nil, errors.New("NewRoomserverInternalAPIHTTP: httpClient is <nil>") return nil, errors.New("NewRoomserverInternalAPIHTTP: httpClient is <nil>")
@ -32,14 +32,14 @@ func NewServerKeyClient(
return &httpServerKeyInternalAPI{ return &httpServerKeyInternalAPI{
serverKeyAPIURL: serverKeyAPIURL, serverKeyAPIURL: serverKeyAPIURL,
httpClient: httpClient, httpClient: httpClient,
immutableCache: immutableCache, cache: cache,
}, nil }, nil
} }
type httpServerKeyInternalAPI struct { type httpServerKeyInternalAPI struct {
serverKeyAPIURL string serverKeyAPIURL string
httpClient *http.Client httpClient *http.Client
immutableCache caching.ImmutableCache cache caching.ServerKeyCache
} }
func (s *httpServerKeyInternalAPI) KeyRing() *gomatrixserverlib.KeyRing { func (s *httpServerKeyInternalAPI) KeyRing() *gomatrixserverlib.KeyRing {
@ -71,7 +71,7 @@ func (s *httpServerKeyInternalAPI) StoreKeys(
response := api.InputPublicKeysResponse{} response := api.InputPublicKeysResponse{}
for req, res := range results { for req, res := range results {
request.Keys[req] = res request.Keys[req] = res
s.immutableCache.StoreServerKey(req, res) s.cache.StoreServerKey(req, res)
} }
return s.InputPublicKeys(ctx, &request, &response) return s.InputPublicKeys(ctx, &request, &response)
} }
@ -92,7 +92,7 @@ func (s *httpServerKeyInternalAPI) FetchKeys(
} }
now := gomatrixserverlib.AsTimestamp(time.Now()) now := gomatrixserverlib.AsTimestamp(time.Now())
for req, ts := range requests { for req, ts := range requests {
if res, ok := s.immutableCache.GetServerKey(req); ok { if res, ok := s.cache.GetServerKey(req); ok {
if now > res.ValidUntilTS && res.ExpiredTS == gomatrixserverlib.PublicKeyNotExpired { if now > res.ValidUntilTS && res.ExpiredTS == gomatrixserverlib.PublicKeyNotExpired {
continue continue
} }
@ -107,7 +107,7 @@ func (s *httpServerKeyInternalAPI) FetchKeys(
} }
for req, res := range response.Results { for req, res := range response.Results {
result[req] = res result[req] = res
s.immutableCache.StoreServerKey(req, res) s.cache.StoreServerKey(req, res)
} }
return result, nil return result, nil
} }

View file

@ -12,7 +12,7 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
func AddRoutes(s api.ServerKeyInternalAPI, internalAPIMux *mux.Router, cache caching.ImmutableCache) { func AddRoutes(s api.ServerKeyInternalAPI, internalAPIMux *mux.Router, cache caching.ServerKeyCache) {
internalAPIMux.Handle(ServerKeyQueryPublicKeyPath, internalAPIMux.Handle(ServerKeyQueryPublicKeyPath,
internal.MakeInternalAPI("queryPublicKeys", func(req *http.Request) util.JSONResponse { internal.MakeInternalAPI("queryPublicKeys", func(req *http.Request) util.JSONResponse {
request := api.QueryPublicKeysRequest{} request := api.QueryPublicKeysRequest{}

View file

@ -29,7 +29,7 @@ func SetupServerKeyAPIComponent(
logrus.WithError(err).Panicf("failed to connect to server key database") logrus.WithError(err).Panicf("failed to connect to server key database")
} }
serverKeyDB, err := cache.NewKeyDatabase(innerDB, base.ImmutableCache) serverKeyDB, err := cache.NewKeyDatabase(innerDB, base.Caches)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database") logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database")
} }
@ -77,7 +77,7 @@ func SetupServerKeyAPIComponent(
}).Info("Enabled perspective key fetcher") }).Info("Enabled perspective key fetcher")
} }
inthttp.AddRoutes(&internalAPI, base.InternalAPIMux, base.ImmutableCache) inthttp.AddRoutes(&internalAPI, base.InternalAPIMux, base.Caches)
return &internalAPI return &internalAPI
} }

View file

@ -12,10 +12,10 @@ import (
// the public keys for other matrix servers. // the public keys for other matrix servers.
type KeyDatabase struct { type KeyDatabase struct {
inner gomatrixserverlib.KeyDatabase inner gomatrixserverlib.KeyDatabase
cache caching.ImmutableCache cache caching.ServerKeyCache
} }
func NewKeyDatabase(inner gomatrixserverlib.KeyDatabase, cache caching.ImmutableCache) (*KeyDatabase, error) { func NewKeyDatabase(inner gomatrixserverlib.KeyDatabase, cache caching.ServerKeyCache) (*KeyDatabase, error) {
if inner == nil { if inner == nil {
return nil, errors.New("inner database can't be nil") return nil, errors.New("inner database can't be nil")
} }