dendrite/cmd/resolve-state/main.go
Neil Alexander 3ea21273bc
Ristretto cache (#2563)
* Try Ristretto cache

* Tweak

* It's beautiful

* Update GMSL

* More strict keyable interface

* Fix that some more

* Make less panicky

* Don't enforce mutability checks for now

* Determine mutability using deep equality

* Tweaks

* Namespace keys

* Make federation caches mutable

* Update cost estimation, add metric

* Update GMSL

* Estimate cost for metrics better

* Reduce counters a bit

* Try caching events

* Some guards

* Try again

* Try this

* Use separate caches for hopefully better hash distribution

* Fix bug with admitting events into cache

* Try to fix bugs

* Check nil

* Try that again

* Preserve order jeezo this is messy

* thanks VS Code for doing exactly the wrong thing

* Try this again

* Be more specific

* aaaaargh

* One more time

* That might be better

* Stronger sorting

* Cache expiries, async publishing of EDUs

* Put it back

* Use a shared cache again

* Cost estimation fixes

* Update ristretto

* Reduce counters a bit

* Clean up a bit

* Update GMSL

* 1GB

* Configurable cache sizees

* Tweaks

* Add `config.DataUnit` for specifying friendly cache sizes

* Various tweaks

* Update GMSL

* Add back some lazy loading caching

* Include key in cost

* Include key in cost

* Tweak max age handling, config key name

* Only register prometheus metrics if requested

* Review comments @S7evinK

* Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere)

* Review comments

* Update sample configs

* Update GHA Workflow

* Update Complement images to Go 1.18

* Remove the cache test from the federation API as we no longer guarantee immediate cache admission

* Don't check the caches in the renewal test

* Possibly fix the upgrade tests

* Update to matrix-org/gomatrixserverlib#322

* Update documentation to refer to Go 1.18
2022-07-11 14:31:31 +01:00

217 lines
5.7 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
)
// This is a utility for inspecting state snapshots and running state resolution
// against real snapshots in an actual database.
// It takes one or more state snapshot NIDs as arguments, along with a room version
// to use for unmarshalling events, and will produce resolved output.
//
// Usage: ./resolve-state --roomversion=version snapshot [snapshot ...]
// e.g. ./resolve-state --roomversion=5 1254 1235 1282
var roomVersion = flag.String("roomversion", "5", "the room version to parse events as")
var filterType = flag.String("filtertype", "", "the event types to filter on")
var difference = flag.Bool("difference", false, "whether to calculate the difference between snapshots")
// nolint:gocyclo
func main() {
ctx := context.Background()
cfg := setup.ParseFlags(true)
cfg.Logging = append(cfg.Logging[:0], config.LogrusHook{
Type: "std",
Level: "error",
})
cfg.ClientAPI.RegistrationDisabled = true
base := base.NewBaseDendrite(cfg, "ResolveState", base.DisableMetrics)
args := flag.Args()
fmt.Println("Room version", *roomVersion)
snapshotNIDs := []types.StateSnapshotNID{}
for _, arg := range args {
if i, err := strconv.Atoi(arg); err == nil {
snapshotNIDs = append(snapshotNIDs, types.StateSnapshotNID(i))
}
}
fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs")
roomserverDB, err := storage.Open(
base, &cfg.RoomServer.Database,
caching.NewRistrettoCache(128*1024*1024, time.Hour, true),
)
if err != nil {
panic(err)
}
stateres := state.NewStateResolution(roomserverDB, &types.RoomInfo{
RoomVersion: gomatrixserverlib.RoomVersion(*roomVersion),
})
if *difference {
if len(snapshotNIDs) != 2 {
panic("need exactly two state snapshot NIDs to calculate difference")
}
var removed, added []types.StateEntry
removed, added, err = stateres.DifferenceBetweeenStateSnapshots(ctx, snapshotNIDs[0], snapshotNIDs[1])
if err != nil {
panic(err)
}
var eventNIDs []types.EventNID
for _, entry := range append(removed, added...) {
eventNIDs = append(eventNIDs, entry.EventNID)
}
var eventEntries []types.Event
eventEntries, err = roomserverDB.Events(ctx, eventNIDs)
if err != nil {
panic(err)
}
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(eventEntries))
for _, entry := range eventEntries {
events[entry.EventNID] = entry.Event
}
if len(removed) > 0 {
fmt.Println("Removed:")
for _, r := range removed {
event := events[r.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
if len(removed) > 0 && len(added) > 0 {
fmt.Println()
}
if len(added) > 0 {
fmt.Println("Added:")
for _, a := range added {
event := events[a.EventNID]
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
}
return
}
var stateEntries []types.StateEntry
for _, snapshotNID := range snapshotNIDs {
var entries []types.StateEntry
entries, err = stateres.LoadStateAtSnapshot(ctx, snapshotNID)
if err != nil {
panic(err)
}
stateEntries = append(stateEntries, entries...)
}
var eventNIDs []types.EventNID
for _, entry := range stateEntries {
eventNIDs = append(eventNIDs, entry.EventNID)
}
fmt.Println("Fetching", len(eventNIDs), "state events")
eventEntries, err := roomserverDB.Events(ctx, eventNIDs)
if err != nil {
panic(err)
}
authEventIDMap := make(map[string]struct{})
events := make([]*gomatrixserverlib.Event, len(eventEntries))
for i := range eventEntries {
events[i] = eventEntries[i].Event
for _, authEventID := range eventEntries[i].AuthEventIDs() {
authEventIDMap[authEventID] = struct{}{}
}
}
authEventIDs := make([]string, 0, len(authEventIDMap))
for authEventID := range authEventIDMap {
authEventIDs = append(authEventIDs, authEventID)
}
fmt.Println("Fetching", len(authEventIDs), "auth events")
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs)
if err != nil {
panic(err)
}
authEvents := make([]*gomatrixserverlib.Event, len(authEventEntries))
for i := range authEventEntries {
authEvents[i] = authEventEntries[i].Event
}
fmt.Println("Resolving state")
var resolved Events
resolved, err = gomatrixserverlib.ResolveConflicts(
gomatrixserverlib.RoomVersion(*roomVersion),
events,
authEvents,
)
if err != nil {
panic(err)
}
fmt.Println("Resolved state contains", len(resolved), "events")
sort.Sort(resolved)
filteringEventType := *filterType
count := 0
for _, event := range resolved {
if filteringEventType != "" && event.Type() != filteringEventType {
continue
}
count++
fmt.Println()
fmt.Printf("* %s %s %q\n", event.EventID(), event.Type(), *event.StateKey())
fmt.Printf(" %s\n", string(event.Content()))
}
fmt.Println()
fmt.Println("Returned", count, "state events after filtering")
}
type Events []*gomatrixserverlib.Event
func (e Events) Len() int {
return len(e)
}
func (e Events) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e Events) Less(i, j int) bool {
typeDelta := strings.Compare(e[i].Type(), e[j].Type())
if typeDelta < 0 {
return true
}
if typeDelta > 0 {
return false
}
stateKeyDelta := strings.Compare(*e[i].StateKey(), *e[j].StateKey())
return stateKeyDelta < 0
}