mirror of
https://github.com/matrix-org/dendrite
synced 2024-11-10 07:04:24 +00:00
Don't query for servers so often in /send (#1766)
* Look up servers less often, don't hit API for missing auth events unless there are actually missing auth events * Remove ResolveConflictsAdhoc (since it is already in GMSL), other tweaks * Update gomatrixserverlib to matrix-org/gomatrixserverlib#254 * Fix resolve-state * Initialise t.servers on first use
This commit is contained in:
parent
f448e8972a
commit
5d74a1757f
6 changed files with 50 additions and 144 deletions
|
@ -8,7 +8,6 @@ import (
|
|||
"strconv"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup"
|
||||
|
@ -105,7 +104,7 @@ func main() {
|
|||
}
|
||||
|
||||
fmt.Println("Resolving state")
|
||||
resolved, err := state.ResolveConflictsAdhoc(
|
||||
resolved, err := gomatrixserverlib.ResolveConflicts(
|
||||
gomatrixserverlib.RoomVersion(*roomVersion),
|
||||
events,
|
||||
authEvents,
|
||||
|
|
|
@ -102,11 +102,13 @@ func Send(
|
|||
|
||||
type txnReq struct {
|
||||
gomatrixserverlib.Transaction
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
eduAPI eduserverAPI.EDUServerInputAPI
|
||||
keyAPI keyapi.KeyInternalAPI
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
federation txnFederationClient
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
eduAPI eduserverAPI.EDUServerInputAPI
|
||||
keyAPI keyapi.KeyInternalAPI
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
federation txnFederationClient
|
||||
servers []gomatrixserverlib.ServerName
|
||||
serversMutex sync.RWMutex
|
||||
// local cache of events for auth checks, etc - this may include events
|
||||
// which the roomserver is unaware of.
|
||||
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
||||
|
@ -404,16 +406,21 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
|
|||
}
|
||||
|
||||
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
|
||||
servers := []gomatrixserverlib.ServerName{t.Origin}
|
||||
t.serversMutex.Lock()
|
||||
defer t.serversMutex.Unlock()
|
||||
if t.servers != nil {
|
||||
return t.servers
|
||||
}
|
||||
t.servers = []gomatrixserverlib.ServerName{t.Origin}
|
||||
serverReq := &api.QueryServerJoinedToRoomRequest{
|
||||
RoomID: roomID,
|
||||
}
|
||||
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
||||
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
||||
servers = append(servers, serverRes.ServerNames...)
|
||||
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID)
|
||||
t.servers = append(t.servers, serverRes.ServerNames...)
|
||||
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
|
||||
}
|
||||
return servers
|
||||
return t.servers
|
||||
}
|
||||
|
||||
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
|
||||
|
@ -482,14 +489,10 @@ func (t *txnReq) retrieveMissingAuthEvents(
|
|||
missingAuthEvents[missingAuthEventID] = struct{}{}
|
||||
}
|
||||
|
||||
servers := t.getServers(ctx, e.RoomID())
|
||||
if len(servers) > 5 {
|
||||
servers = servers[:5]
|
||||
}
|
||||
withNextEvent:
|
||||
for missingAuthEventID := range missingAuthEvents {
|
||||
withNextServer:
|
||||
for _, server := range servers {
|
||||
for _, server := range t.getServers(ctx, e.RoomID()) {
|
||||
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
|
||||
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
|
||||
if err != nil {
|
||||
|
@ -692,13 +695,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
|
|||
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
|
||||
}
|
||||
|
||||
servers := t.getServers(ctx, roomID)
|
||||
if len(servers) > 5 {
|
||||
servers = servers[:5]
|
||||
}
|
||||
|
||||
// fetch the event we're missing and add it to the pile
|
||||
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
|
||||
h, err := t.lookupEvent(ctx, roomVersion, roomID, eventID, false)
|
||||
switch err.(type) {
|
||||
case verifySigError:
|
||||
return respState, false, nil
|
||||
|
@ -740,11 +738,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
|||
t.haveEvents[ev.EventID()] = res.StateEvents[i]
|
||||
}
|
||||
var authEvents []*gomatrixserverlib.Event
|
||||
missingAuthEvents := make(map[string]bool)
|
||||
missingAuthEvents := map[string]bool{}
|
||||
for _, ev := range res.StateEvents {
|
||||
for _, ae := range ev.AuthEventIDs() {
|
||||
aev, ok := t.haveEvents[ae]
|
||||
if ok {
|
||||
if aev, ok := t.haveEvents[ae]; ok {
|
||||
authEvents = append(authEvents, aev.Unwrap())
|
||||
} else {
|
||||
missingAuthEvents[ae] = true
|
||||
|
@ -753,27 +750,28 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
|
|||
}
|
||||
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
|
||||
// have stored the event.
|
||||
var missingEventList []string
|
||||
for evID := range missingAuthEvents {
|
||||
missingEventList = append(missingEventList, evID)
|
||||
}
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
EventIDs: missingEventList,
|
||||
}
|
||||
util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
|
||||
var queryRes api.QueryEventsByIDResponse
|
||||
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
|
||||
return nil
|
||||
}
|
||||
for i := range queryRes.Events {
|
||||
evID := queryRes.Events[i].EventID()
|
||||
t.haveEvents[evID] = queryRes.Events[i]
|
||||
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
|
||||
if len(missingAuthEvents) > 0 {
|
||||
var missingEventList []string
|
||||
for evID := range missingAuthEvents {
|
||||
missingEventList = append(missingEventList, evID)
|
||||
}
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
EventIDs: missingEventList,
|
||||
}
|
||||
util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
|
||||
var queryRes api.QueryEventsByIDResponse
|
||||
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
|
||||
return nil
|
||||
}
|
||||
for i := range queryRes.Events {
|
||||
evID := queryRes.Events[i].EventID()
|
||||
t.haveEvents[evID] = queryRes.Events[i]
|
||||
authEvents = append(authEvents, queryRes.Events[i].Unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
|
||||
return &gomatrixserverlib.RespState{
|
||||
StateEvents: evs,
|
||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
|
||||
AuthEvents: authEvents,
|
||||
}
|
||||
}
|
||||
|
@ -805,11 +803,7 @@ retryAllowedState:
|
|||
if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil {
|
||||
switch missing := err.(type) {
|
||||
case gomatrixserverlib.MissingAuthEventError:
|
||||
servers := t.getServers(ctx, backwardsExtremity.RoomID())
|
||||
if len(servers) > 5 {
|
||||
servers = servers[:5]
|
||||
}
|
||||
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
|
||||
h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
|
||||
switch err2.(type) {
|
||||
case verifySigError:
|
||||
return &gomatrixserverlib.RespState{
|
||||
|
@ -857,17 +851,8 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
|
|||
latestEvents[i] = res.LatestEvents[i].EventID
|
||||
}
|
||||
|
||||
servers := []gomatrixserverlib.ServerName{t.Origin}
|
||||
serverReq := &api.QueryServerJoinedToRoomRequest{
|
||||
RoomID: e.RoomID(),
|
||||
}
|
||||
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
||||
if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
||||
servers = append(servers, serverRes.ServerNames...)
|
||||
logger.Infof("Found %d server(s) to query for missing events", len(servers))
|
||||
}
|
||||
|
||||
var missingResp *gomatrixserverlib.RespMissingEvents
|
||||
servers := t.getServers(ctx, e.RoomID())
|
||||
for _, server := range servers {
|
||||
var m gomatrixserverlib.RespMissingEvents
|
||||
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
|
||||
|
@ -1015,12 +1000,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
|||
"concurrent_requests": concurrentRequests,
|
||||
}).Info("Fetching missing state at event")
|
||||
|
||||
// Get a list of servers to fetch from.
|
||||
servers := t.getServers(ctx, roomID)
|
||||
if len(servers) > 5 {
|
||||
servers = servers[:5]
|
||||
}
|
||||
|
||||
// Create a queue containing all of the missing event IDs that we want
|
||||
// to retrieve.
|
||||
pending := make(chan string, missingCount)
|
||||
|
@ -1046,7 +1025,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
|
|||
// Define what we'll do in order to fetch the missing event ID.
|
||||
fetch := func(missingEventID string) {
|
||||
var h *gomatrixserverlib.HeaderedEvent
|
||||
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
|
||||
h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
|
||||
switch err.(type) {
|
||||
case verifySigError:
|
||||
return
|
||||
|
@ -1112,7 +1091,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
|
|||
return &respState, nil
|
||||
}
|
||||
|
||||
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
if localFirst {
|
||||
// fetch from the roomserver
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
|
@ -1127,6 +1106,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
|
|||
}
|
||||
var event *gomatrixserverlib.Event
|
||||
found := false
|
||||
servers := t.getServers(ctx, roomID)
|
||||
for _, serverName := range servers {
|
||||
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
|
||||
if err != nil || len(txn.PDUs) == 0 {
|
||||
|
|
2
go.mod
2
go.mod
|
@ -22,7 +22,7 @@ require (
|
|||
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
|
||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0
|
||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.2
|
||||
|
|
4
go.sum
4
go.sum
|
@ -567,8 +567,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead h1:VmGJybKUQin8+NyA9ZkrHJpE8ygXzcON9peQH9LC92c=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0 h1:eP8t7DaLKkNz0IT9GcJeG6UTKjfvihIxbAXKN0I7j6g=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
|
||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
|
||||
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
|
||||
|
|
|
@ -112,7 +112,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
|||
return fmt.Errorf("getAuthChain: %w", err)
|
||||
}
|
||||
|
||||
stateEvents, err = state.ResolveConflictsAdhoc(info.RoomVersion, stateEvents, authEvents)
|
||||
stateEvents, err = gomatrixserverlib.ResolveConflicts(info.RoomVersion, stateEvents, authEvents)
|
||||
if err != nil {
|
||||
return fmt.Errorf("state.ResolveConflictsAdhoc: %w", err)
|
||||
}
|
||||
|
@ -469,7 +469,7 @@ func (r *Queryer) QueryStateAndAuthChain(
|
|||
}
|
||||
|
||||
if request.ResolveState {
|
||||
if stateEvents, err = state.ResolveConflictsAdhoc(
|
||||
if stateEvents, err = gomatrixserverlib.ResolveConflicts(
|
||||
info.RoomVersion, stateEvents, authEvents,
|
||||
); err != nil {
|
||||
return err
|
||||
|
|
|
@ -683,79 +683,6 @@ func (v *StateResolution) calculateStateAfterManyEvents(
|
|||
return
|
||||
}
|
||||
|
||||
// ResolveConflictsAdhoc is a helper function to assist the query API in
|
||||
// performing state resolution when requested. This is a different code
|
||||
// path to the rest of state.go because this assumes you already have
|
||||
// gomatrixserverlib.Event objects and not just a bunch of NIDs like
|
||||
// elsewhere in the state resolution.
|
||||
// TODO: Some of this can possibly be deduplicated
|
||||
func ResolveConflictsAdhoc(
|
||||
version gomatrixserverlib.RoomVersion,
|
||||
events []*gomatrixserverlib.Event,
|
||||
authEvents []*gomatrixserverlib.Event,
|
||||
) ([]*gomatrixserverlib.Event, error) {
|
||||
type stateKeyTuple struct {
|
||||
Type string
|
||||
StateKey string
|
||||
}
|
||||
|
||||
// Prepare our data structures.
|
||||
eventMap := make(map[stateKeyTuple][]*gomatrixserverlib.Event)
|
||||
var conflicted, notConflicted, resolved []*gomatrixserverlib.Event
|
||||
|
||||
// Run through all of the events that we were given and sort them
|
||||
// into a map, sorted by (event_type, state_key) tuple. This means
|
||||
// that we can easily spot events that are "conflicted", e.g.
|
||||
// there are duplicate values for the same tuple key.
|
||||
for _, event := range events {
|
||||
if event.StateKey() == nil {
|
||||
// Ignore events that are not state events.
|
||||
continue
|
||||
}
|
||||
// Append the events if there is already a conflicted list for
|
||||
// this tuple key, create it if not.
|
||||
tuple := stateKeyTuple{event.Type(), *event.StateKey()}
|
||||
eventMap[tuple] = append(eventMap[tuple], event)
|
||||
}
|
||||
|
||||
// Split out the events in the map into conflicted and unconflicted
|
||||
// buckets. The conflicted events will be ran through state res,
|
||||
// whereas unconfliced events will always going to appear in the
|
||||
// final resolved state.
|
||||
for _, list := range eventMap {
|
||||
if len(list) > 1 {
|
||||
conflicted = append(conflicted, list...)
|
||||
} else {
|
||||
notConflicted = append(notConflicted, list...)
|
||||
}
|
||||
}
|
||||
|
||||
// Work out which state resolution algorithm we want to run for
|
||||
// the room version.
|
||||
stateResAlgo, err := version.StateResAlgorithm()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch stateResAlgo {
|
||||
case gomatrixserverlib.StateResV1:
|
||||
// Currently state res v1 doesn't handle unconflicted events
|
||||
// for us, like state res v2 does, so we will need to add the
|
||||
// unconflicted events into the state ourselves.
|
||||
// TODO: Fix state res v1 so this is handled for the caller.
|
||||
resolved = gomatrixserverlib.ResolveStateConflicts(conflicted, authEvents)
|
||||
resolved = append(resolved, notConflicted...)
|
||||
case gomatrixserverlib.StateResV2:
|
||||
// TODO: auth difference here?
|
||||
resolved = gomatrixserverlib.ResolveStateConflictsV2(conflicted, notConflicted, authEvents, authEvents)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo)
|
||||
}
|
||||
|
||||
// Return the final resolved state events, including both the
|
||||
// resolved set of conflicted events, and the unconflicted events.
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
func (v *StateResolution) resolveConflicts(
|
||||
ctx context.Context, version gomatrixserverlib.RoomVersion,
|
||||
notConflicted, conflicted []types.StateEntry,
|
||||
|
|
Loading…
Reference in a new issue