mirror of
https://github.com/matrix-org/dendrite
synced 2025-01-07 18:58:43 +00:00
eb9e90379d
Companion to https://github.com/matrix-org/gomatrixserverlib/pull/400 This tries to mimic the logic found in Synapse, as dropping events can break rooms (and we may end up in endless loops..)
987 lines
36 KiB
Go
987 lines
36 KiB
Go
package input
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
|
"github.com/matrix-org/util"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
|
"github.com/matrix-org/dendrite/internal"
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/roomserver/state"
|
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
|
"github.com/matrix-org/dendrite/roomserver/types"
|
|
)
|
|
|
|
type parsedRespState struct {
|
|
AuthEvents []gomatrixserverlib.PDU
|
|
StateEvents []gomatrixserverlib.PDU
|
|
}
|
|
|
|
func (p *parsedRespState) Events() []gomatrixserverlib.PDU {
|
|
eventsByID := make(map[string]gomatrixserverlib.PDU, len(p.AuthEvents)+len(p.StateEvents))
|
|
for i, event := range p.AuthEvents {
|
|
eventsByID[event.EventID()] = p.AuthEvents[i]
|
|
}
|
|
for i, event := range p.StateEvents {
|
|
eventsByID[event.EventID()] = p.StateEvents[i]
|
|
}
|
|
allEvents := make([]gomatrixserverlib.PDU, 0, len(eventsByID))
|
|
for _, event := range eventsByID {
|
|
allEvents = append(allEvents, event)
|
|
}
|
|
return gomatrixserverlib.ReverseTopologicalOrdering(
|
|
gomatrixserverlib.ToPDUs(allEvents), gomatrixserverlib.TopologicalOrderByAuthEvents)
|
|
}
|
|
|
|
type missingStateReq struct {
|
|
log *logrus.Entry
|
|
virtualHost spec.ServerName
|
|
origin spec.ServerName
|
|
db storage.RoomDatabase
|
|
roomInfo *types.RoomInfo
|
|
inputer *Inputer
|
|
keys gomatrixserverlib.JSONVerifier
|
|
federation fedapi.RoomserverFederationAPI
|
|
roomsMu *internal.MutexByRoom
|
|
servers []spec.ServerName
|
|
hadEvents map[string]bool
|
|
hadEventsMutex sync.Mutex
|
|
haveEvents map[string]gomatrixserverlib.PDU
|
|
haveEventsMutex sync.Mutex
|
|
}
|
|
|
|
// processEventWithMissingState is the entrypoint for a missingStateReq
|
|
// request, as called from processRoomEvent.
|
|
// nolint:gocyclo
|
|
func (t *missingStateReq) processEventWithMissingState(
|
|
ctx context.Context, e gomatrixserverlib.PDU, roomVersion gomatrixserverlib.RoomVersion,
|
|
) (*parsedRespState, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
|
|
defer trace.EndRegion()
|
|
|
|
// We are missing the previous events for this events.
|
|
// This means that there is a gap in our view of the history of the
|
|
// room. There two ways that we can handle such a gap:
|
|
// 1) We can fill in the gap using /get_missing_events
|
|
// 2) We can leave the gap and request the state of the room at
|
|
// this event from the remote server using either /state_ids
|
|
// or /state.
|
|
// Synapse will attempt to do 1 and if that fails or if the gap is
|
|
// too large then it will attempt 2.
|
|
// Synapse will use /state_ids if possible since usually the state
|
|
// is largely unchanged and it is more efficient to fetch a list of
|
|
// event ids and then use /event to fetch the individual events.
|
|
// However not all version of synapse support /state_ids so you may
|
|
// need to fallback to /state.
|
|
t.log = util.GetLogger(ctx).WithFields(map[string]interface{}{
|
|
"txn_event": e.EventID(),
|
|
"room_id": e.RoomID(),
|
|
"txn_prev_events": e.PrevEventIDs(),
|
|
})
|
|
|
|
// Attempt to fill in the gap using /get_missing_events
|
|
// This will either:
|
|
// - fill in the gap completely then process event `e` returning no backwards extremity
|
|
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
|
|
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
|
|
newEvents, isGapFilled, prevStatesKnown, err := t.getMissingEvents(ctx, e, roomVersion)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("t.getMissingEvents: %w", err)
|
|
}
|
|
if len(newEvents) == 0 {
|
|
return nil, fmt.Errorf("expected to find missing events but didn't")
|
|
}
|
|
if isGapFilled {
|
|
t.log.Infof("Gap filled by /get_missing_events, injecting %d new events", len(newEvents))
|
|
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
|
|
// in the gap in the DAG
|
|
for _, newEvent := range newEvents {
|
|
err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{
|
|
Kind: api.KindOld,
|
|
Event: &types.HeaderedEvent{PDU: newEvent},
|
|
Origin: t.origin,
|
|
SendAsServer: api.DoNotSendToOtherServers,
|
|
})
|
|
if err != nil {
|
|
if _, ok := err.(types.RejectedError); !ok {
|
|
return nil, fmt.Errorf("t.inputer.processRoomEvent (filling gap): %w", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we filled the gap *and* we know the state before the prev events
|
|
// then there's nothing else to do, we have everything we need to deal
|
|
// with the new event.
|
|
if isGapFilled && prevStatesKnown {
|
|
t.log.Infof("Gap filled and state found for all prev events")
|
|
return nil, nil
|
|
}
|
|
|
|
// Otherwise, if we've reached this point, it's possible that we've
|
|
// either not closed the gap, or we did but we still don't seem to
|
|
// know the events before the new event. Start by looking up the
|
|
// state at the event at the back of the gap and we'll try to roll
|
|
// forward the state first.
|
|
backwardsExtremity := newEvents[0]
|
|
newEvents = newEvents[1:]
|
|
|
|
resolvedState, err := t.lookupResolvedStateBeforeEvent(ctx, backwardsExtremity, roomVersion)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("t.lookupState (backwards extremity): %w", err)
|
|
}
|
|
|
|
hadEvents := map[string]bool{}
|
|
t.hadEventsMutex.Lock()
|
|
for k, v := range t.hadEvents {
|
|
hadEvents[k] = v
|
|
}
|
|
t.hadEventsMutex.Unlock()
|
|
|
|
sendOutliers := func(resolvedState *parsedRespState) error {
|
|
outliers := resolvedState.Events()
|
|
outlierRoomEvents := make([]api.InputRoomEvent, 0, len(outliers))
|
|
for _, outlier := range outliers {
|
|
if hadEvents[outlier.EventID()] {
|
|
continue
|
|
}
|
|
outlierRoomEvents = append(outlierRoomEvents, api.InputRoomEvent{
|
|
Kind: api.KindOutlier,
|
|
Event: &types.HeaderedEvent{PDU: outlier},
|
|
Origin: t.origin,
|
|
})
|
|
}
|
|
for _, ire := range outlierRoomEvents {
|
|
if err = t.inputer.processRoomEvent(ctx, t.virtualHost, &ire); err != nil {
|
|
if _, ok := err.(types.RejectedError); !ok {
|
|
return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Send outliers first so we can send the state along with the new backwards
|
|
// extremity without any missing auth events.
|
|
if err = sendOutliers(resolvedState); err != nil {
|
|
return nil, fmt.Errorf("sendOutliers: %w", err)
|
|
}
|
|
|
|
// Now send the backward extremity into the roomserver with the
|
|
// newly resolved state. This marks the "oldest" point in the backfill and
|
|
// sets the baseline state for any new events after this.
|
|
stateIDs := make([]string, 0, len(resolvedState.StateEvents))
|
|
for _, event := range resolvedState.StateEvents {
|
|
stateIDs = append(stateIDs, event.EventID())
|
|
}
|
|
|
|
err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{
|
|
Kind: api.KindOld,
|
|
Event: &types.HeaderedEvent{PDU: backwardsExtremity},
|
|
Origin: t.origin,
|
|
HasState: true,
|
|
StateEventIDs: stateIDs,
|
|
SendAsServer: api.DoNotSendToOtherServers,
|
|
})
|
|
if err != nil {
|
|
if _, ok := err.(types.RejectedError); !ok {
|
|
return nil, fmt.Errorf("t.inputer.processRoomEvent (backward extremity): %w", err)
|
|
}
|
|
}
|
|
|
|
// Then send all of the newer backfilled events, of which will all be newer
|
|
// than the backward extremity, into the roomserver without state. This way
|
|
// they will automatically fast-forward based on the room state at the
|
|
// extremity in the last step.
|
|
for _, newEvent := range newEvents {
|
|
err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{
|
|
Kind: api.KindOld,
|
|
Event: &types.HeaderedEvent{PDU: newEvent},
|
|
Origin: t.origin,
|
|
SendAsServer: api.DoNotSendToOtherServers,
|
|
})
|
|
if err != nil {
|
|
if _, ok := err.(types.RejectedError); !ok {
|
|
return nil, fmt.Errorf("t.inputer.processRoomEvent (fast forward): %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finally, check again if we know everything we need to know in order to
|
|
// make forward progress. If the prev state is known then we consider the
|
|
// rolled forward state to be sufficient — we now know all of the state
|
|
// before the prev events. If we don't then we need to look up the state
|
|
// before the new event as well, otherwise we will never make any progress.
|
|
if t.isPrevStateKnown(ctx, e) {
|
|
return nil, nil
|
|
}
|
|
|
|
// If we still haven't got the state for the prev events then we'll go and
|
|
// ask the federation for it if needed.
|
|
resolvedState, err = t.lookupResolvedStateBeforeEvent(ctx, e, roomVersion)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("t.lookupState (new event): %w", err)
|
|
}
|
|
|
|
// Send the outliers for the retrieved state.
|
|
if err = sendOutliers(resolvedState); err != nil {
|
|
return nil, fmt.Errorf("sendOutliers: %w", err)
|
|
}
|
|
|
|
// Then return the resolved state, for which the caller can replace the
|
|
// HasState with the event IDs to create a new state snapshot when we
|
|
// process the new event.
|
|
return resolvedState, nil
|
|
}
|
|
|
|
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e gomatrixserverlib.PDU, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupResolvedStateBeforeEvent")
|
|
defer trace.EndRegion()
|
|
|
|
type respState struct {
|
|
// A snapshot is considered trustworthy if it came from our own roomserver.
|
|
// That's because the state will have been through state resolution once
|
|
// already in QueryStateAfterEvent.
|
|
trustworthy bool
|
|
*parsedRespState
|
|
}
|
|
|
|
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
|
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
|
|
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
|
|
var states []*respState
|
|
var validationError error
|
|
for _, prevEventID := range e.PrevEventIDs() {
|
|
// Look up what the state is after the backward extremity. This will either
|
|
// come from the roomserver, if we know all the required events, or it will
|
|
// come from a remote server via /state_ids if not.
|
|
prevState, trustworthy, err := t.lookupStateAfterEvent(ctx, roomVersion, e.RoomID(), prevEventID)
|
|
switch err2 := err.(type) {
|
|
case gomatrixserverlib.EventValidationError:
|
|
if !err2.Persistable {
|
|
return nil, err2
|
|
}
|
|
validationError = err2
|
|
case nil:
|
|
default:
|
|
return nil, fmt.Errorf("t.lookupStateAfterEvent: %w", err)
|
|
}
|
|
// Append the state onto the collected state. We'll run this through the
|
|
// state resolution next.
|
|
states = append(states, &respState{trustworthy, prevState})
|
|
}
|
|
|
|
// Now that we have collected all of the state from the prev_events, we'll
|
|
// run the state through the appropriate state resolution algorithm for the
|
|
// room if needed. This does a couple of things:
|
|
// 1. Ensures that the state is deduplicated fully for each state-key tuple
|
|
// 2. Ensures that we pick the latest events from both sets, in the case that
|
|
// one of the prev_events is quite a bit older than the others
|
|
resolvedState := &parsedRespState{}
|
|
switch len(states) {
|
|
case 0:
|
|
extremityIsCreate := e.Type() == spec.MRoomCreate && e.StateKeyEquals("")
|
|
if !extremityIsCreate {
|
|
// There are no previous states and this isn't the beginning of the
|
|
// room - this is an error condition!
|
|
return nil, fmt.Errorf("expected %d states but got %d", len(e.PrevEventIDs()), len(states))
|
|
}
|
|
case 1:
|
|
// There's only one previous state - if it's trustworthy (came from a
|
|
// local state snapshot which will already have been through state res),
|
|
// use it as-is. There's no point in resolving it again. Only trust a
|
|
// trustworthy state snapshot if it actually contains some state for all
|
|
// non-create events, otherwise we need to resolve what came from federation.
|
|
isCreate := e.Type() == spec.MRoomCreate && e.StateKeyEquals("")
|
|
if states[0].trustworthy && (isCreate || len(states[0].StateEvents) > 0) {
|
|
resolvedState = states[0].parsedRespState
|
|
break
|
|
}
|
|
// Otherwise, if it isn't trustworthy (came from federation), run it through
|
|
// state resolution anyway for safety, in case there are duplicates.
|
|
fallthrough
|
|
default:
|
|
respStates := make([]*parsedRespState, len(states))
|
|
for i := range states {
|
|
respStates[i] = states[i].parsedRespState
|
|
}
|
|
// There's more than one previous state - run them all through state res
|
|
var err error
|
|
t.roomsMu.Lock(e.RoomID())
|
|
resolvedState, err = t.resolveStatesAndCheck(ctx, roomVersion, respStates, e)
|
|
t.roomsMu.Unlock(e.RoomID())
|
|
switch err2 := err.(type) {
|
|
case gomatrixserverlib.EventValidationError:
|
|
if !err2.Persistable {
|
|
return nil, err2
|
|
}
|
|
validationError = err2
|
|
case nil:
|
|
default:
|
|
return nil, fmt.Errorf("t.resolveStatesAndCheck: %w", err)
|
|
}
|
|
}
|
|
|
|
return resolvedState, validationError
|
|
}
|
|
|
|
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
|
// added into the mix.
|
|
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEvent")
|
|
defer trace.EndRegion()
|
|
|
|
// try doing all this locally before we resort to querying federation
|
|
respState := t.lookupStateAfterEventLocally(ctx, eventID)
|
|
if respState != nil {
|
|
return respState, true, nil
|
|
}
|
|
|
|
logrus.WithContext(ctx).Warnf("State for event %s not available locally, falling back to federation (via %d servers)", eventID, len(t.servers))
|
|
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
|
|
if err != nil {
|
|
logrus.WithContext(ctx).WithError(err).Errorf("Failed to look up state before event %s", eventID)
|
|
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
|
|
}
|
|
|
|
// fetch the event we're missing and add it to the pile
|
|
var validationError error
|
|
h, err := t.lookupEvent(ctx, roomVersion, roomID, eventID, false)
|
|
switch e := err.(type) {
|
|
case gomatrixserverlib.EventValidationError:
|
|
if !e.Persistable {
|
|
logrus.WithContext(ctx).WithError(err).Errorf("Failed to look up event %s", eventID)
|
|
return nil, false, e
|
|
}
|
|
validationError = e
|
|
case verifySigError:
|
|
return respState, false, nil
|
|
case nil:
|
|
// do nothing
|
|
default:
|
|
logrus.WithContext(ctx).WithError(err).Errorf("Failed to look up event %s", eventID)
|
|
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
|
|
}
|
|
h = t.cacheAndReturn(h)
|
|
if h.StateKey() != nil {
|
|
addedToState := false
|
|
for i := range respState.StateEvents {
|
|
se := respState.StateEvents[i]
|
|
if se.Type() == h.Type() && se.StateKeyEquals(*h.StateKey()) {
|
|
respState.StateEvents[i] = h
|
|
addedToState = true
|
|
break
|
|
}
|
|
}
|
|
if !addedToState {
|
|
respState.StateEvents = append(respState.StateEvents, h)
|
|
}
|
|
}
|
|
|
|
return respState, false, validationError
|
|
}
|
|
|
|
func (t *missingStateReq) cacheAndReturn(ev gomatrixserverlib.PDU) gomatrixserverlib.PDU {
|
|
t.haveEventsMutex.Lock()
|
|
defer t.haveEventsMutex.Unlock()
|
|
if cached, exists := t.haveEvents[ev.EventID()]; exists {
|
|
return cached
|
|
}
|
|
t.haveEvents[ev.EventID()] = ev
|
|
return ev
|
|
}
|
|
|
|
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, eventID string) *parsedRespState {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEventLocally")
|
|
defer trace.EndRegion()
|
|
|
|
var res parsedRespState
|
|
roomState := state.NewStateResolution(t.db, t.roomInfo, t.inputer.Queryer)
|
|
stateAtEvents, err := t.db.StateAtEventIDs(ctx, []string{eventID})
|
|
if err != nil {
|
|
t.log.WithError(err).Warnf("failed to get state after %s locally", eventID)
|
|
return nil
|
|
}
|
|
stateEntries, err := roomState.LoadCombinedStateAfterEvents(ctx, stateAtEvents)
|
|
if err != nil {
|
|
t.log.WithError(err).Warnf("failed to load combined state after %s locally", eventID)
|
|
return nil
|
|
}
|
|
stateEventNIDs := make([]types.EventNID, 0, len(stateEntries))
|
|
for _, entry := range stateEntries {
|
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
|
}
|
|
if t.roomInfo == nil {
|
|
return nil
|
|
}
|
|
stateEvents, err := t.db.Events(ctx, t.roomInfo.RoomVersion, stateEventNIDs)
|
|
if err != nil {
|
|
t.log.WithError(err).Warnf("failed to load state events locally")
|
|
return nil
|
|
}
|
|
res.StateEvents = make([]gomatrixserverlib.PDU, 0, len(stateEvents))
|
|
for _, ev := range stateEvents {
|
|
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
|
// processEvent request, which is better for memory.
|
|
res.StateEvents = append(res.StateEvents, t.cacheAndReturn(ev.PDU))
|
|
t.hadEvent(ev.EventID())
|
|
}
|
|
|
|
// encourage GC
|
|
stateEvents, stateEventNIDs, stateEntries, stateAtEvents = nil, nil, nil, nil // nolint:ineffassign
|
|
|
|
missingAuthEvents := map[string]bool{}
|
|
res.AuthEvents = make([]gomatrixserverlib.PDU, 0, len(stateEvents)*3)
|
|
for _, ev := range stateEvents {
|
|
t.haveEventsMutex.Lock()
|
|
for _, ae := range ev.AuthEventIDs() {
|
|
if aev, ok := t.haveEvents[ae]; ok {
|
|
res.AuthEvents = append(res.AuthEvents, aev)
|
|
} else {
|
|
missingAuthEvents[ae] = true
|
|
}
|
|
}
|
|
t.haveEventsMutex.Unlock()
|
|
}
|
|
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
|
|
// have stored the event.
|
|
if len(missingAuthEvents) > 0 {
|
|
var missingEventList []string
|
|
for evID := range missingAuthEvents {
|
|
missingEventList = append(missingEventList, evID)
|
|
}
|
|
t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
|
|
events, err := t.db.EventsFromIDs(ctx, t.roomInfo, missingEventList)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
for i, ev := range events {
|
|
res.AuthEvents = append(res.AuthEvents, t.cacheAndReturn(events[i].PDU))
|
|
t.hadEvent(ev.EventID())
|
|
}
|
|
}
|
|
|
|
return &res
|
|
}
|
|
|
|
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
|
|
// the server supports.
|
|
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
|
*parsedRespState, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupStateBeforeEvent")
|
|
defer trace.EndRegion()
|
|
|
|
// Attempt to fetch the missing state using /state_ids and /events
|
|
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
|
}
|
|
|
|
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity gomatrixserverlib.PDU) (*parsedRespState, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "resolveStatesAndCheck")
|
|
defer trace.EndRegion()
|
|
|
|
var authEventList []gomatrixserverlib.PDU
|
|
var stateEventList []gomatrixserverlib.PDU
|
|
for _, state := range states {
|
|
authEventList = append(authEventList, state.AuthEvents...)
|
|
stateEventList = append(stateEventList, state.StateEvents...)
|
|
}
|
|
resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(
|
|
roomVersion, gomatrixserverlib.ToPDUs(stateEventList), gomatrixserverlib.ToPDUs(authEventList), func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
return t.inputer.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// apply the current event
|
|
var validationError error
|
|
retryAllowedState:
|
|
if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
return t.inputer.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
|
|
}); err != nil {
|
|
switch missing := err.(type) {
|
|
case gomatrixserverlib.MissingAuthEventError:
|
|
h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
|
|
switch e := err2.(type) {
|
|
case gomatrixserverlib.EventValidationError:
|
|
if !e.Persistable {
|
|
return nil, e
|
|
}
|
|
validationError = e
|
|
case verifySigError:
|
|
return &parsedRespState{
|
|
AuthEvents: authEventList,
|
|
StateEvents: resolvedStateEvents,
|
|
}, nil
|
|
case nil:
|
|
// do nothing
|
|
default:
|
|
return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2)
|
|
}
|
|
t.log.Tracef("fetched event %s", missing.AuthEventID)
|
|
resolvedStateEvents = append(resolvedStateEvents, h)
|
|
goto retryAllowedState
|
|
default:
|
|
}
|
|
return nil, err
|
|
}
|
|
return &parsedRespState{
|
|
AuthEvents: authEventList,
|
|
StateEvents: resolvedStateEvents,
|
|
}, validationError
|
|
}
|
|
|
|
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
|
|
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
|
|
func (t *missingStateReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.PDU, roomVersion gomatrixserverlib.RoomVersion) (newEvents []gomatrixserverlib.PDU, isGapFilled, prevStateKnown bool, err error) {
|
|
trace, ctx := internal.StartRegion(ctx, "getMissingEvents")
|
|
defer trace.EndRegion()
|
|
|
|
logger := t.log.WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
|
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
|
|
if err != nil {
|
|
return nil, false, false, fmt.Errorf("t.DB.LatestEventIDs: %w", err)
|
|
}
|
|
latestEvents := make([]string, len(latest))
|
|
for i := range latest {
|
|
latestEvents[i] = latest[i]
|
|
t.hadEvent(latest[i])
|
|
}
|
|
|
|
var missingResp *fclient.RespMissingEvents
|
|
for _, server := range t.servers {
|
|
var m fclient.RespMissingEvents
|
|
if m, err = t.federation.LookupMissingEvents(ctx, t.virtualHost, server, e.RoomID(), fclient.MissingEvents{
|
|
Limit: 20,
|
|
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
|
|
EarliestEvents: latestEvents,
|
|
// The event IDs to retrieve the previous events for.
|
|
LatestEvents: []string{e.EventID()},
|
|
}, roomVersion); err == nil {
|
|
missingResp = &m
|
|
break
|
|
} else {
|
|
logger.WithError(err).Warnf("%s pushed us an event but %q did not respond to /get_missing_events", t.origin, server)
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
select {
|
|
case <-ctx.Done(): // the parent request context timed out
|
|
return nil, false, false, context.DeadlineExceeded
|
|
default: // this request exceed its own timeout
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if missingResp == nil {
|
|
logger.WithError(err).Warnf(
|
|
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
|
t.origin, len(t.servers),
|
|
)
|
|
return nil, false, false, missingPrevEventsError{
|
|
eventID: e.EventID(),
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
// Make sure events from the missingResp are using the cache - missing events
|
|
// will be added and duplicates will be removed.
|
|
missingEvents := make([]gomatrixserverlib.PDU, 0, len(missingResp.Events))
|
|
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
|
if err = gomatrixserverlib.VerifyEventSignatures(ctx, ev, t.keys, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
return t.inputer.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
|
|
}); err != nil {
|
|
continue
|
|
}
|
|
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
|
}
|
|
logger.Debugf("get_missing_events returned %d events (%d passed signature checks)", len(missingResp.Events), len(missingEvents))
|
|
|
|
// topologically sort and sanity check that we are making forward progress
|
|
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(
|
|
gomatrixserverlib.ToPDUs(missingEvents), gomatrixserverlib.TopologicalOrderByPrevEvents)
|
|
shouldHaveSomeEventIDs := e.PrevEventIDs()
|
|
hasPrevEvent := false
|
|
Event:
|
|
for _, pe := range shouldHaveSomeEventIDs {
|
|
for _, ev := range newEvents {
|
|
if ev.EventID() == pe {
|
|
hasPrevEvent = true
|
|
break Event
|
|
}
|
|
}
|
|
}
|
|
if !hasPrevEvent {
|
|
err = fmt.Errorf("called /get_missing_events but server %s didn't return any prev_events with IDs %v", t.origin, shouldHaveSomeEventIDs)
|
|
logger.WithError(err).Warnf(
|
|
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
|
t.origin,
|
|
)
|
|
return nil, false, false, missingPrevEventsError{
|
|
eventID: e.EventID(),
|
|
err: err,
|
|
}
|
|
}
|
|
if len(newEvents) == 0 {
|
|
return nil, false, false, nil // TODO: error instead?
|
|
}
|
|
|
|
earliestNewEvent := newEvents[0]
|
|
|
|
// If we retrieved back to the beginning of the room then there's nothing else
|
|
// to do - we closed the gap.
|
|
if len(earliestNewEvent.PrevEventIDs()) == 0 && earliestNewEvent.Type() == spec.MRoomCreate && earliestNewEvent.StateKeyEquals("") {
|
|
return newEvents, true, t.isPrevStateKnown(ctx, e), nil
|
|
}
|
|
|
|
// If our backward extremity was not a known event to us then we obviously didn't
|
|
// close the gap.
|
|
if state, err := t.db.StateAtEventIDs(ctx, []string{earliestNewEvent.EventID()}); err != nil || len(state) == 0 && state[0].BeforeStateSnapshotNID == 0 {
|
|
return newEvents, false, false, nil
|
|
}
|
|
|
|
// At this point we are satisfied that we know the state both at the earliest
|
|
// retrieved event and at the prev events of the new event.
|
|
return newEvents, true, t.isPrevStateKnown(ctx, e), nil
|
|
}
|
|
|
|
func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e gomatrixserverlib.PDU) bool {
|
|
expected := len(e.PrevEventIDs())
|
|
state, err := t.db.StateAtEventIDs(ctx, e.PrevEventIDs())
|
|
if err != nil || len(state) != expected {
|
|
// We didn't get as many state snapshots as we expected, or there was an error,
|
|
// so we haven't completely solved the problem for the new event.
|
|
return false
|
|
}
|
|
// Check to see if we have a populated state snapshot for all of the prev events.
|
|
for _, stateAtEvent := range state {
|
|
if stateAtEvent.BeforeStateSnapshotNID == 0 {
|
|
// One of the prev events still has unknown state, so we haven't really
|
|
// solved the problem.
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (t *missingStateReq) lookupMissingStateViaState(
|
|
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
|
|
) (respState *parsedRespState, err error) {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaState")
|
|
defer trace.EndRegion()
|
|
|
|
state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check that the returned state is valid.
|
|
authEvents, stateEvents, err := gomatrixserverlib.CheckStateResponse(ctx, &fclient.RespState{
|
|
StateEvents: state.GetStateEvents(),
|
|
AuthEvents: state.GetAuthEvents(),
|
|
}, roomVersion, t.keys, nil, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
return t.inputer.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &parsedRespState{
|
|
AuthEvents: authEvents,
|
|
StateEvents: stateEvents,
|
|
}, nil
|
|
}
|
|
|
|
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
|
*parsedRespState, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaStateIDs")
|
|
defer trace.EndRegion()
|
|
|
|
t.log.Infof("lookupMissingStateViaStateIDs %s", eventID)
|
|
// fetch the state event IDs at the time of the event
|
|
var stateIDs gomatrixserverlib.StateIDResponse
|
|
var err error
|
|
count := 0
|
|
totalctx, totalcancel := context.WithTimeout(ctx, time.Minute*5)
|
|
for _, serverName := range t.servers {
|
|
reqctx, reqcancel := context.WithTimeout(totalctx, time.Second*20)
|
|
stateIDs, err = t.federation.LookupStateIDs(reqctx, t.virtualHost, serverName, roomID, eventID)
|
|
reqcancel()
|
|
if err == nil {
|
|
break
|
|
}
|
|
count++
|
|
}
|
|
totalcancel()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("t.federation.LookupStateIDs tried %d server(s), last error: %w", count, err)
|
|
}
|
|
// work out which auth/state IDs are missing
|
|
wantIDs := append(stateIDs.GetStateEventIDs(), stateIDs.GetAuthEventIDs()...)
|
|
missing := make(map[string]bool)
|
|
var missingEventList []string
|
|
t.haveEventsMutex.Lock()
|
|
for _, sid := range wantIDs {
|
|
if _, ok := t.haveEvents[sid]; !ok {
|
|
if !missing[sid] {
|
|
missing[sid] = true
|
|
missingEventList = append(missingEventList, sid)
|
|
}
|
|
}
|
|
}
|
|
t.haveEventsMutex.Unlock()
|
|
|
|
events, err := t.db.EventsFromIDs(ctx, t.roomInfo, missingEventList)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
|
|
}
|
|
|
|
for i, ev := range events {
|
|
events[i].PDU = t.cacheAndReturn(events[i].PDU)
|
|
t.hadEvent(ev.EventID())
|
|
evID := events[i].EventID()
|
|
if missing[evID] {
|
|
delete(missing, evID)
|
|
}
|
|
}
|
|
|
|
// encourage GC
|
|
events = nil // nolint:ineffassign
|
|
|
|
concurrentRequests := 8
|
|
missingCount := len(missing)
|
|
t.log.WithField("event_id", eventID).Debugf("lookupMissingStateViaStateIDs missing %d/%d events", missingCount, len(wantIDs))
|
|
|
|
// If over 50% of the auth/state events from /state_ids are missing
|
|
// then we'll just call /state instead, otherwise we'll just end up
|
|
// hammering the remote side with /event requests unnecessarily.
|
|
if missingCount > concurrentRequests && missingCount > len(wantIDs)/2 {
|
|
t.log.WithFields(logrus.Fields{
|
|
"missing": missingCount,
|
|
"event_id": eventID,
|
|
"total_state": len(stateIDs.GetStateEventIDs()),
|
|
"total_auth_events": len(stateIDs.GetAuthEventIDs()),
|
|
}).Debug("Fetching all state at event")
|
|
return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion)
|
|
}
|
|
|
|
if missingCount > 0 {
|
|
t.log.WithFields(logrus.Fields{
|
|
"missing": missingCount,
|
|
"event_id": eventID,
|
|
"total_state": len(stateIDs.GetStateEventIDs()),
|
|
"total_auth_events": len(stateIDs.GetAuthEventIDs()),
|
|
"concurrent_requests": concurrentRequests,
|
|
}).Debug("Fetching missing state at event")
|
|
|
|
// Create a queue containing all of the missing event IDs that we want
|
|
// to retrieve.
|
|
pending := make(chan string, missingCount)
|
|
for missingEventID := range missing {
|
|
pending <- missingEventID
|
|
}
|
|
close(pending)
|
|
|
|
// Define how many workers we should start to do this.
|
|
if missingCount < concurrentRequests {
|
|
concurrentRequests = missingCount
|
|
}
|
|
|
|
// Create the wait group.
|
|
var fetchgroup sync.WaitGroup
|
|
fetchgroup.Add(concurrentRequests)
|
|
|
|
// This is the only place where we'll write to t.haveEvents from
|
|
// multiple goroutines, and everywhere else is blocked on this
|
|
// synchronous function anyway.
|
|
var haveEventsMutex sync.Mutex
|
|
|
|
// Define what we'll do in order to fetch the missing event ID.
|
|
fetch := func(missingEventID string) {
|
|
h, herr := t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
|
|
switch e := herr.(type) {
|
|
case gomatrixserverlib.EventValidationError:
|
|
if !e.Persistable {
|
|
return
|
|
}
|
|
case verifySigError:
|
|
return
|
|
case nil:
|
|
break
|
|
default:
|
|
t.log.WithFields(logrus.Fields{
|
|
"missing_event_id": missingEventID,
|
|
}).WithError(herr).Warn("Failed to fetch missing event")
|
|
return
|
|
}
|
|
haveEventsMutex.Lock()
|
|
t.cacheAndReturn(h)
|
|
haveEventsMutex.Unlock()
|
|
}
|
|
|
|
// Create the worker.
|
|
worker := func(ch <-chan string) {
|
|
defer fetchgroup.Done()
|
|
for missingEventID := range ch {
|
|
fetch(missingEventID)
|
|
}
|
|
}
|
|
|
|
// Start the workers.
|
|
for i := 0; i < concurrentRequests; i++ {
|
|
go worker(pending)
|
|
}
|
|
|
|
// Wait for the workers to finish.
|
|
fetchgroup.Wait()
|
|
}
|
|
|
|
resp, err := t.createRespStateFromStateIDs(stateIDs)
|
|
return resp, err
|
|
}
|
|
|
|
func (t *missingStateReq) createRespStateFromStateIDs(
|
|
stateIDs gomatrixserverlib.StateIDResponse,
|
|
) (*parsedRespState, error) { // nolint:unparam
|
|
t.haveEventsMutex.Lock()
|
|
defer t.haveEventsMutex.Unlock()
|
|
|
|
// create a RespState response using the response to /state_ids as a guide
|
|
respState := parsedRespState{}
|
|
|
|
stateEventIDs := stateIDs.GetStateEventIDs()
|
|
authEventIDs := stateIDs.GetAuthEventIDs()
|
|
for i := range stateEventIDs {
|
|
ev, ok := t.haveEvents[stateEventIDs[i]]
|
|
if !ok {
|
|
logrus.Tracef("Missing state event in createRespStateFromStateIDs: %s", stateEventIDs[i])
|
|
continue
|
|
}
|
|
respState.StateEvents = append(respState.StateEvents, ev)
|
|
}
|
|
for i := range authEventIDs {
|
|
ev, ok := t.haveEvents[authEventIDs[i]]
|
|
if !ok {
|
|
logrus.Tracef("Missing auth event in createRespStateFromStateIDs: %s", authEventIDs[i])
|
|
continue
|
|
}
|
|
respState.AuthEvents = append(respState.AuthEvents, ev)
|
|
}
|
|
// We purposefully do not do auth checks on the returned events, as they will still
|
|
// be processed in the exact same way, just as a 'rejected' event
|
|
// TODO: Add a field to HeaderedEvent to indicate if the event is rejected.
|
|
return &respState, nil
|
|
}
|
|
|
|
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (gomatrixserverlib.PDU, error) {
|
|
trace, ctx := internal.StartRegion(ctx, "lookupEvent")
|
|
defer trace.EndRegion()
|
|
|
|
verImpl, err := gomatrixserverlib.GetRoomVersion(roomVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if localFirst {
|
|
// fetch from the roomserver
|
|
events, err := t.db.EventsFromIDs(ctx, t.roomInfo, []string{missingEventID})
|
|
if err != nil {
|
|
t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
|
|
} else if len(events) == 1 {
|
|
return events[0].PDU, nil
|
|
}
|
|
}
|
|
var event gomatrixserverlib.PDU
|
|
found := false
|
|
var validationError error
|
|
serverLoop:
|
|
for _, serverName := range t.servers {
|
|
reqctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
|
defer cancel()
|
|
txn, err := t.federation.GetEvent(reqctx, t.virtualHost, serverName, missingEventID)
|
|
if err != nil || len(txn.PDUs) == 0 {
|
|
t.log.WithError(err).WithField("missing_event_id", missingEventID).Warn("Failed to get missing /event for event ID")
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
select {
|
|
case <-reqctx.Done(): // this server took too long
|
|
continue
|
|
case <-ctx.Done(): // the input request timed out
|
|
return nil, context.DeadlineExceeded
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
event, err = verImpl.NewEventFromUntrustedJSON(txn.PDUs[0])
|
|
switch e := err.(type) {
|
|
case gomatrixserverlib.EventValidationError:
|
|
// If the event is persistable, e.g. failed validation for exceeding
|
|
// byte sizes, we can "accept" the event.
|
|
if e.Persistable {
|
|
validationError = e
|
|
found = true
|
|
break serverLoop
|
|
}
|
|
// If we can't persist the event, we probably can't do so with results
|
|
// from other servers, so also break the loop.
|
|
break serverLoop
|
|
case nil:
|
|
found = true
|
|
break serverLoop
|
|
default:
|
|
t.log.WithError(err).WithField("missing_event_id", missingEventID).Warnf("Failed to parse event JSON of event returned from /event")
|
|
continue
|
|
}
|
|
}
|
|
if !found {
|
|
t.log.WithField("missing_event_id", missingEventID).Warnf("Failed to get missing /event for event ID from %d server(s)", len(t.servers))
|
|
return nil, fmt.Errorf("wasn't able to find event via %d server(s)", len(t.servers))
|
|
}
|
|
if err := gomatrixserverlib.VerifyEventSignatures(ctx, event, t.keys, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
return t.inputer.Queryer.QueryUserIDForSender(ctx, roomID, senderID)
|
|
}); err != nil {
|
|
t.log.WithError(err).Warnf("Couldn't validate signature of event %q from /event", event.EventID())
|
|
return nil, verifySigError{event.EventID(), err}
|
|
}
|
|
return t.cacheAndReturn(event), validationError
|
|
}
|
|
|
|
func checkAllowedByState(e gomatrixserverlib.PDU, stateEvents []gomatrixserverlib.PDU, userIDForSender spec.UserIDForSender) error {
|
|
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
|
|
for i := range stateEvents {
|
|
err := authUsingState.AddEvent(stateEvents[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return gomatrixserverlib.Allowed(e, &authUsingState, userIDForSender)
|
|
}
|
|
|
|
func (t *missingStateReq) hadEvent(eventID string) {
|
|
t.hadEventsMutex.Lock()
|
|
defer t.hadEventsMutex.Unlock()
|
|
t.hadEvents[eventID] = true
|
|
}
|
|
|
|
type verifySigError struct {
|
|
eventID string
|
|
err error
|
|
}
|
|
type missingPrevEventsError struct {
|
|
eventID string
|
|
err error
|
|
}
|
|
|
|
func (e verifySigError) Error() string {
|
|
return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err)
|
|
}
|
|
func (e missingPrevEventsError) Error() string {
|
|
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
|
|
}
|