From ec7718e7f842fa0fc5198489c904de21003db4c2 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 11 Jun 2020 19:50:40 +0100 Subject: [PATCH] Roomserver API changes (#1118) * s/QueryBackfill/PerformBackfill/g * OutputEvent now includes AddStateEvents which contain the full event of extra state events * Only include adds not the current event * Get adding state right --- appservice/consumers/roomserver.go | 51 +---------- clientapi/consumers/roomserver.go | 64 ++------------ federationapi/routing/backfill.go | 8 +- federationapi/routing/send_test.go | 6 +- federationsender/consumers/roomserver.go | 6 +- publicroomsapi/consumers/roomserver.go | 17 +--- roomserver/api/api.go | 6 +- roomserver/api/output.go | 27 ++++++ roomserver/api/perform.go | 29 +++++++ roomserver/api/query.go | 29 ------- roomserver/internal/input_latest_events.go | 66 +++++++++++---- ...{query_backfill.go => perform_backfill.go} | 0 roomserver/internal/query.go | 12 +-- roomserver/inthttp/client.go | 18 ++-- roomserver/inthttp/server.go | 10 +-- syncapi/consumers/roomserver.go | 84 +------------------ syncapi/routing/messages.go | 6 +- 17 files changed, 152 insertions(+), 287 deletions(-) rename roomserver/internal/{query_backfill.go => perform_backfill.go} (100%) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index bb4df7906..1657fe542 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -91,60 +91,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), - }).Info("appservice received an event from roomserver") - - missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) - if err != nil { - return err - } - events := append(missingEvents, ev) + events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} + events = append(events, output.NewRoomEvent.AddStateEvents...) // Send event to any relevant application services return s.filterRoomserverEvents(context.TODO(), events) } -// lookupMissingStateEvents looks up the state events that are added by a new event, -// and returns any not already present. -func (s *OutputRoomEventConsumer) lookupMissingStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent, -) ([]gomatrixserverlib.HeaderedEvent, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - return []gomatrixserverlib.HeaderedEvent{}, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.HeaderedEvent{}, nil - } - - result := []gomatrixserverlib.HeaderedEvent{} - missing := []string{} - for _, id := range addsStateEventIDs { - if id != event.EventID() { - // If the event isn't the current one, add it to the list of events - // to retrieve from the roomserver - missing = append(missing, id) - } - } - - // Request the missing events from the roomserver - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - result = append(result, eventResp.Events...) - - return result, nil -} - // filterRoomserverEvents takes in events and decides whether any of them need // to be passed on to an external application service. It does this by checking // each namespace of each registered application service, and if there is a diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index bd8ac1dcd..caa028ba3 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -84,63 +84,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), - }).Info("received event from roomserver") - - events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event) - if err != nil { - return err - } - - return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) -} - -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.Event, -) ([]gomatrixserverlib.Event, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - // If the event is a membership update (e.g. for a profile update), it won't - // show up in AddsStateEventIDs, so we need to add it manually - if event.Type() == "m.room.member" { - return []gomatrixserverlib.Event{event}, nil - } - return nil, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.Event{event}, nil - } - - result := []gomatrixserverlib.Event{} - missing := []string{} - for _, id := range addsStateEventIDs { - // Append the current event in the results if its ID is in the events list - if id == event.EventID() { - result = append(result, event) - } else { - // If the event isn't the current one, add it to the list of events - // to retrieve from the roomserver - missing = append(missing, id) - } - } - - // Request the missing events from the roomserver - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - for _, headeredEvent := range eventResp.Events { - result = append(result, headeredEvent.Event) - } - - return result, nil + return s.db.UpdateMemberships( + context.TODO(), + gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()), + output.NewRoomEvent.RemovesStateEventIDs, + ) } diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index 10bc62630..f906c73c9 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -37,7 +37,7 @@ func Backfill( roomID string, cfg *config.Dendrite, ) util.JSONResponse { - var res api.QueryBackfillResponse + var res api.PerformBackfillResponse var eIDs []string var limit string var exists bool @@ -68,7 +68,7 @@ func Backfill( } // Populate the request. - req := api.QueryBackfillRequest{ + req := api.PerformBackfillRequest{ RoomID: roomID, // we don't know who the successors are for these events, which won't // be a problem because we don't use that information when servicing /backfill requests, @@ -87,8 +87,8 @@ func Backfill( } // Query the roomserver. - if err = rsAPI.QueryBackfill(httpReq.Context(), &req, &res); err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryBackfill failed") + if err = rsAPI.PerformBackfill(httpReq.Context(), &req, &res); err != nil { + util.GetLogger(httpReq.Context()).WithError(err).Error("query.PerformBackfill failed") return jsonerror.InternalServerError() } diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 9081a8700..adae7c221 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -211,10 +211,10 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain( } // Query a given amount (or less) of events prior to a given set of events. -func (t *testRoomserverAPI) QueryBackfill( +func (t *testRoomserverAPI) PerformBackfill( ctx context.Context, - request *api.QueryBackfillRequest, - response *api.QueryBackfillResponse, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, ) error { return fmt.Errorf("not implemented") } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 5f8a555bd..a15937f9e 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -131,11 +131,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event) - if err != nil { - return err - } - addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents) + addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState())) if err != nil { return err } diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index c513d3b24..ba187cb11 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -78,20 +78,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } - ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "type": ev.Type(), - }).Info("received event from roomserver") - - addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs} - var addQueryRes api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil { - log.Warn(err) - return err - } - remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} var remQueryRes api.QueryEventsByIDResponse if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { @@ -100,9 +86,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } var addQueryEvents, remQueryEvents []gomatrixserverlib.Event - for _, headeredEvent := range addQueryRes.Events { + for _, headeredEvent := range output.NewRoomEvent.AddsState() { addQueryEvents = append(addQueryEvents, headeredEvent.Event) } + addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap()) for _, headeredEvent := range remQueryRes.Events { remQueryEvents = append(remQueryEvents, headeredEvent.Event) } diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 3a2ad059b..967f58baf 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -89,10 +89,10 @@ type RoomserverInternalAPI interface { ) error // Query a given amount (or less) of events prior to a given set of events. - QueryBackfill( + PerformBackfill( ctx context.Context, - request *QueryBackfillRequest, - response *QueryBackfillResponse, + request *PerformBackfillRequest, + response *PerformBackfillResponse, ) error // Asks for the default room version as preferred by the server. diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 92a468a96..2bbd97af8 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -63,6 +63,13 @@ type OutputNewRoomEvent struct { // Together with RemovesStateEventIDs this allows the receiver to keep an up to date // view of the current state of the room. AddsStateEventIDs []string `json:"adds_state_event_ids"` + // All extra newly added state events. This is only set if there are *extra* events + // other than `Event`. This can happen when forks get merged because state resolution + // may decide a bunch of state events on one branch are now valid, so they will be + // present in this list. This is useful when trying to maintain the current state of a room + // as to do so you need to include both these events and `Event`. + AddStateEvents []gomatrixserverlib.HeaderedEvent `json:"adds_state_events"` + // The state event IDs that were removed from the state of the room by this event. RemovesStateEventIDs []string `json:"removes_state_event_ids"` // The ID of the event that was output before this event. @@ -112,6 +119,26 @@ type OutputNewRoomEvent struct { TransactionID *TransactionID `json:"transaction_id"` } +// AddsState returns all added state events from this event. +// +// This function is needed because `AddStateEvents` will not include a copy of +// the original event to save space, so you cannot use that slice alone. +// Instead, use this function which will add the original event if it is present +// in `AddsStateEventIDs`. +func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent { + includeOutputEvent := false + for _, id := range ore.AddsStateEventIDs { + if id == ore.Event.EventID() { + includeOutputEvent = true + break + } + } + if !includeOutputEvent { + return ore.AddStateEvents + } + return append(ore.AddStateEvents, ore.Event) +} + // An OutputNewInviteEvent is written whenever an invite becomes active. // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 1cf54144e..3e5cae1b6 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -2,6 +2,7 @@ package api import ( "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type PerformJoinRequest struct { @@ -22,3 +23,31 @@ type PerformLeaveRequest struct { type PerformLeaveResponse struct { } + +// PerformBackfillRequest is a request to PerformBackfill. +type PerformBackfillRequest struct { + // The room to backfill + RoomID string `json:"room_id"` + // A map of backwards extremity event ID to a list of its prev_event IDs. + BackwardsExtremities map[string][]string `json:"backwards_extremities"` + // The maximum number of events to retrieve. + Limit int `json:"limit"` + // The server interested in the events. + ServerName gomatrixserverlib.ServerName `json:"server_name"` +} + +// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. +func (r *PerformBackfillRequest) PrevEventIDs() []string { + var prevEventIDs []string + for _, pes := range r.BackwardsExtremities { + prevEventIDs = append(prevEventIDs, pes...) + } + prevEventIDs = util.UniqueStrings(prevEventIDs) + return prevEventIDs +} + +// PerformBackfillResponse is a response to PerformBackfill. +type PerformBackfillResponse struct { + // Missing events, arbritrary order. + Events []gomatrixserverlib.HeaderedEvent `json:"events"` +} diff --git a/roomserver/api/query.go b/roomserver/api/query.go index c9a46ae9e..b1525342e 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -18,7 +18,6 @@ package api import ( "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" ) // QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState @@ -204,34 +203,6 @@ type QueryStateAndAuthChainResponse struct { AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` } -// QueryBackfillRequest is a request to QueryBackfill. -type QueryBackfillRequest struct { - // The room to backfill - RoomID string `json:"room_id"` - // A map of backwards extremity event ID to a list of its prev_event IDs. - BackwardsExtremities map[string][]string `json:"backwards_extremities"` - // The maximum number of events to retrieve. - Limit int `json:"limit"` - // The server interested in the events. - ServerName gomatrixserverlib.ServerName `json:"server_name"` -} - -// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. -func (r *QueryBackfillRequest) PrevEventIDs() []string { - var prevEventIDs []string - for _, pes := range r.BackwardsExtremities { - prevEventIDs = append(prevEventIDs, pes...) - } - prevEventIDs = util.UniqueStrings(prevEventIDs) - return prevEventIDs -} - -// QueryBackfillResponse is a response to QueryBackfill. -type QueryBackfillResponse struct { - // Missing events, arbritrary order. - Events []gomatrixserverlib.HeaderedEvent `json:"events"` -} - // QueryRoomVersionCapabilitiesRequest asks for the default room version type QueryRoomVersionCapabilitiesRequest struct{} diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index aea85ca95..e69307ada 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -19,6 +19,7 @@ package internal import ( "bytes" "context" + "fmt" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" @@ -310,24 +311,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) TransactionID: u.transactionID, } - var stateEventNIDs []types.EventNID - for _, entry := range u.added { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.removed { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.stateBeforeEventRemoves { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - for _, entry := range u.stateBeforeEventAdds { - stateEventNIDs = append(stateEventNIDs, entry.EventNID) - } - stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] - eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs) + eventIDMap, err := u.stateEventMap() if err != nil { return nil, err } + for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } @@ -342,12 +330,60 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore.SendAsServer = u.sendAsServer + // include extra state events if they were added as nearly every downstream component will care about it + // and we'd rather not have them all hit QueryEventsByID at the same time! + if len(ore.AddsStateEventIDs) > 0 { + ore.AddStateEvents, err = u.extraEventsForIDs(roomVersion, ore.AddsStateEventIDs) + if err != nil { + return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) + } + } + return &api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, NewRoomEvent: &ore, }, nil } +// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being +// updated. +func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { + var extraEventIDs []string + for _, e := range eventIDs { + if e == u.event.EventID() { + continue + } + extraEventIDs = append(extraEventIDs, e) + } + if len(extraEventIDs) == 0 { + return nil, nil + } + extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs) + if err != nil { + return nil, err + } + var h []gomatrixserverlib.HeaderedEvent + for _, e := range extraEvents { + h = append(h, e.Headered(roomVersion)) + } + return h, nil +} + +// retrieve an event nid -> event ID map for all events that need updating +func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) { + var stateEventNIDs []types.EventNID + var allStateEntries []types.StateEntry + allStateEntries = append(allStateEntries, u.added...) + allStateEntries = append(allStateEntries, u.removed...) + allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...) + allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...) + for _, entry := range allStateEntries { + stateEventNIDs = append(stateEventNIDs, entry.EventNID) + } + stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))] + return u.api.DB.EventIDs(u.ctx, stateEventNIDs) +} + type eventNIDSorter []types.EventNID func (s eventNIDSorter) Len() int { return len(s) } diff --git a/roomserver/internal/query_backfill.go b/roomserver/internal/perform_backfill.go similarity index 100% rename from roomserver/internal/query_backfill.go rename to roomserver/internal/perform_backfill.go diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index aea933884..375ddc23c 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -441,11 +441,11 @@ func (r *RoomserverInternalAPI) QueryMissingEvents( return err } -// QueryBackfill implements api.RoomServerQueryAPI -func (r *RoomserverInternalAPI) QueryBackfill( +// PerformBackfill implements api.RoomServerQueryAPI +func (r *RoomserverInternalAPI) PerformBackfill( ctx context.Context, - request *api.QueryBackfillRequest, - response *api.QueryBackfillResponse, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, ) error { // if we are requesting the backfill then we need to do a federation hit // TODO: we could be more sensible and fetch as many events we already have then request the rest @@ -489,7 +489,7 @@ func (r *RoomserverInternalAPI) QueryBackfill( return err } -func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error { +func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error { roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID) if err != nil { return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err) @@ -647,7 +647,7 @@ func (r *RoomserverInternalAPI) scanEventTree( var pre string // TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be) - // Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing + // Currently, callers like PerformBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing // so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in // duplicate events being sent in response to /backfill requests. initialIgnoreList := make(map[string]bool, len(visited)) diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 6f5e882e7..1244300d4 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -24,8 +24,9 @@ const ( RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" // Perform operations - RoomserverPerformJoinPath = "/roomserver/performJoin" - RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformJoinPath = "/roomserver/performJoin" + RoomserverPerformLeavePath = "/roomserver/performLeave" + RoomserverPerformBackfillPath = "/roomserver/performBackfill" // Query operations RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState" @@ -36,7 +37,6 @@ const ( RoomserverQueryServerAllowedToSeeEventPath = "/roomserver/queryServerAllowedToSeeEvent" RoomserverQueryMissingEventsPath = "/roomserver/queryMissingEvents" RoomserverQueryStateAndAuthChainPath = "/roomserver/queryStateAndAuthChain" - RoomserverQueryBackfillPath = "/roomserver/queryBackfill" RoomserverQueryRoomVersionCapabilitiesPath = "/roomserver/queryRoomVersionCapabilities" RoomserverQueryRoomVersionForRoomPath = "/roomserver/queryRoomVersionForRoom" ) @@ -274,16 +274,16 @@ func (h *httpRoomserverInternalAPI) QueryStateAndAuthChain( return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } -// QueryBackfill implements RoomServerQueryAPI -func (h *httpRoomserverInternalAPI) QueryBackfill( +// PerformBackfill implements RoomServerQueryAPI +func (h *httpRoomserverInternalAPI) PerformBackfill( ctx context.Context, - request *api.QueryBackfillRequest, - response *api.QueryBackfillResponse, + request *api.PerformBackfillRequest, + response *api.PerformBackfillResponse, ) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBackfill") defer span.Finish() - apiURL := h.roomserverURL + RoomserverQueryBackfillPath + apiURL := h.roomserverURL + RoomserverPerformBackfillPath return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go index 3a13ce371..8ac815f3e 100644 --- a/roomserver/inthttp/server.go +++ b/roomserver/inthttp/server.go @@ -165,14 +165,14 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) { }), ) internalAPIMux.Handle( - RoomserverQueryBackfillPath, - internal.MakeInternalAPI("QueryBackfill", func(req *http.Request) util.JSONResponse { - var request api.QueryBackfillRequest - var response api.QueryBackfillResponse + RoomserverPerformBackfillPath, + internal.MakeInternalAPI("PerformBackfill", func(req *http.Request) util.JSONResponse { + var request api.PerformBackfillRequest + var response api.PerformBackfillResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { return util.ErrorResponse(err) } - if err := r.QueryBackfill(req.Context(), &request, &response); err != nil { + if err := r.PerformBackfill(req.Context(), &request, &response); err != nil { return util.ErrorResponse(err) } return util.JSONResponse{Code: http.StatusOK, JSON: &response} diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 055f76603..135976823 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -17,7 +17,6 @@ package consumers import ( "context" "encoding/json" - "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" @@ -105,17 +104,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( "room_version": ev.RoomVersion, }).Info("received event from roomserver") - addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev) - if err != nil { - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": msg.AddsStateEventIDs, - "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: state event lookup failure") - } + addsStateEvents := msg.AddsState() - ev, err = s.updateStateEvent(ev) + ev, err := s.updateStateEvent(ev) if err != nil { return err } @@ -185,63 +176,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( return nil } -// lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent, -) ([]gomatrixserverlib.HeaderedEvent, error) { - // Fast path if there aren't any new state events. - if len(addsStateEventIDs) == 0 { - return nil, nil - } - - // Fast path if the only state event added is the event itself. - if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.HeaderedEvent{event}, nil - } - - // Check if this is re-adding a state events that we previously processed - // If we have previously received a state event it may still be in - // our event database. - result, err := s.db.Events(context.TODO(), addsStateEventIDs) - if err != nil { - return nil, err - } - missing := missingEventsFrom(result, addsStateEventIDs) - - // Check if event itself is being added. - for _, eventID := range missing { - if eventID == event.EventID() { - result = append(result, event) - break - } - } - missing = missingEventsFrom(result, addsStateEventIDs) - - if len(missing) == 0 { - return result, nil - } - - // At this point the missing events are neither the event itself nor are - // they present in our local database. Our only option is to fetch them - // from the roomserver using the query API. - eventReq := api.QueryEventsByIDRequest{EventIDs: missing} - var eventResp api.QueryEventsByIDResponse - if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { - return nil, err - } - - result = append(result, eventResp.Events...) - missing = missingEventsFrom(result, addsStateEventIDs) - - if len(missing) != 0 { - return nil, fmt.Errorf( - "missing %d state events IDs at event %q", len(missing), event.EventID(), - ) - } - - return result, nil -} - func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { var stateKey string if event.StateKey() == nil { @@ -270,17 +204,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade event.Event, err = event.SetUnsigned(prev) return event, err } - -func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string { - have := map[string]bool{} - for _, event := range events { - have[event.EventID()] = true - } - var missing []string - for _, eventID := range required { - if !have[eventID] { - missing = append(missing, eventID) - } - } - return missing -} diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 8c8976345..de5429db4 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -375,15 +375,15 @@ func (e eventsByDepth) Less(i, j int) bool { // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) { - var res api.QueryBackfillResponse - err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{ + var res api.PerformBackfillResponse + err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{ RoomID: roomID, BackwardsExtremities: backwardsExtremities, Limit: limit, ServerName: r.cfg.Matrix.ServerName, }, &res) if err != nil { - return nil, fmt.Errorf("QueryBackfill failed: %w", err) + return nil, fmt.Errorf("PerformBackfill failed: %w", err) } util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")