2020-01-23 17:51:10 +00:00
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"context"
2020-03-06 14:31:12 +00:00
"fmt"
2020-01-23 17:51:10 +00:00
"net/http"
"sort"
2022-08-11 16:23:35 +00:00
"time"
2020-01-23 17:51:10 +00:00
2022-08-05 12:27:27 +00:00
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/clientapi/jsonerror"
2022-04-22 09:38:29 +00:00
"github.com/matrix-org/dendrite/internal/caching"
2022-09-30 11:48:10 +00:00
"github.com/matrix-org/dendrite/internal/sqlutil"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/roomserver/api"
2020-12-02 17:41:00 +00:00
"github.com/matrix-org/dendrite/setup/config"
2022-08-11 16:23:35 +00:00
"github.com/matrix-org/dendrite/syncapi/internal"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/syncapi/storage"
2020-12-16 17:31:03 +00:00
"github.com/matrix-org/dendrite/syncapi/sync"
2020-01-23 17:51:10 +00:00
"github.com/matrix-org/dendrite/syncapi/types"
2020-10-02 16:08:13 +00:00
userapi "github.com/matrix-org/dendrite/userapi/api"
2020-01-23 17:51:10 +00:00
)
type messagesReq struct {
ctx context . Context
db storage . Database
2022-09-30 11:48:10 +00:00
snapshot storage . DatabaseTransaction
2022-05-05 08:56:03 +00:00
rsAPI api . SyncRoomserverAPI
2020-08-10 13:18:04 +00:00
cfg * config . SyncAPI
2020-01-23 17:51:10 +00:00
roomID string
2020-05-13 11:14:50 +00:00
from * types . TopologyToken
to * types . TopologyToken
2020-10-02 16:08:13 +00:00
device * userapi . Device
2020-01-23 17:51:10 +00:00
wasToProvided bool
backwardOrdering bool
2022-03-01 14:39:56 +00:00
filter * gomatrixserverlib . RoomEventFilter
2020-01-23 17:51:10 +00:00
}
type messagesResp struct {
2020-12-16 18:10:39 +00:00
Start string ` json:"start" `
2022-03-18 10:40:01 +00:00
StartStream string ` json:"start_stream,omitempty" ` // NOTSPEC: used by Cerulean, so clients can hit /messages then immediately /sync with a latest sync token
2022-06-29 08:49:12 +00:00
End string ` json:"end,omitempty" `
2020-12-16 18:10:39 +00:00
Chunk [ ] gomatrixserverlib . ClientEvent ` json:"chunk" `
2022-03-01 14:39:56 +00:00
State [ ] gomatrixserverlib . ClientEvent ` json:"state" `
2020-01-23 17:51:10 +00:00
}
// OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
func OnIncomingMessagesRequest (
2020-10-02 16:08:13 +00:00
req * http . Request , db storage . Database , roomID string , device * userapi . Device ,
2022-05-05 08:56:03 +00:00
rsAPI api . SyncRoomserverAPI ,
2020-08-10 13:18:04 +00:00
cfg * config . SyncAPI ,
2020-12-16 17:31:03 +00:00
srp * sync . RequestPool ,
2022-05-06 13:33:34 +00:00
lazyLoadCache caching . LazyLoadCache ,
2020-01-23 17:51:10 +00:00
) util . JSONResponse {
var err error
2022-09-30 11:48:10 +00:00
// NewDatabaseTransaction is used here instead of NewDatabaseSnapshot as we
// expect to be able to write to the database in response to a /messages
// request that requires backfilling from the roomserver or federation.
snapshot , err := db . NewDatabaseTransaction ( req . Context ( ) )
if err != nil {
return jsonerror . InternalServerError ( )
}
var succeeded bool
defer sqlutil . EndTransactionWithCheck ( snapshot , & succeeded , & err )
2020-11-05 10:19:23 +00:00
// check if the user has already forgotten about this room
2022-10-24 15:03:04 +00:00
membershipResp , err := getMembershipForUser ( req . Context ( ) , roomID , device . UserID , rsAPI )
2020-11-05 10:19:23 +00:00
if err != nil {
return jsonerror . InternalServerError ( )
}
2022-10-24 15:03:04 +00:00
if ! membershipResp . RoomExists {
2022-05-11 10:29:23 +00:00
return util . JSONResponse {
Code : http . StatusForbidden ,
JSON : jsonerror . Forbidden ( "room does not exist" ) ,
}
}
2020-11-05 10:19:23 +00:00
2022-10-24 15:03:04 +00:00
if membershipResp . IsRoomForgotten {
2020-11-05 10:19:23 +00:00
return util . JSONResponse {
Code : http . StatusForbidden ,
JSON : jsonerror . Forbidden ( "user already forgot about this room" ) ,
}
}
2022-03-01 14:39:56 +00:00
filter , err := parseRoomEventFilter ( req )
if err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . InvalidArgumentValue ( "unable to parse filter" ) ,
}
}
2020-01-23 17:51:10 +00:00
// Extract parameters from the request's URL.
// Pagination tokens.
2020-05-13 11:14:50 +00:00
var fromStream * types . StreamingToken
2020-12-16 17:31:03 +00:00
fromQuery := req . URL . Query ( ) . Get ( "from" )
2022-03-18 10:40:01 +00:00
toQuery := req . URL . Query ( ) . Get ( "to" )
2020-12-16 18:10:39 +00:00
emptyFromSupplied := fromQuery == ""
if emptyFromSupplied {
2020-12-16 17:31:03 +00:00
// NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided.
// We do this to allow clients to get messages without having to call `/sync` e.g Cerulean
currPos := srp . Notifier . CurrentPosition ( )
fromQuery = currPos . String ( )
}
2020-01-23 17:51:10 +00:00
// Direction to return events from.
dir := req . URL . Query ( ) . Get ( "dir" )
if dir != "b" && dir != "f" {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . MissingArgument ( "Bad or missing dir query parameter (should be either 'b' or 'f')" ) ,
}
}
// A boolean is easier to handle in this case, especially since dir is sure
// to have one of the two accepted values (so dir == "f" <=> !backwardOrdering).
backwardOrdering := ( dir == "b" )
2022-03-18 10:40:01 +00:00
from , err := types . NewTopologyTokenFromString ( fromQuery )
if err != nil {
var streamToken types . StreamingToken
if streamToken , err = types . NewStreamTokenFromString ( fromQuery ) ; err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . InvalidArgumentValue ( "Invalid from parameter: " + err . Error ( ) ) ,
}
} else {
fromStream = & streamToken
2022-09-30 11:48:10 +00:00
from , err = snapshot . StreamToTopologicalPosition ( req . Context ( ) , roomID , streamToken . PDUPosition , backwardOrdering )
2022-03-18 10:40:01 +00:00
if err != nil {
logrus . WithError ( err ) . Errorf ( "Failed to get topological position for streaming token %v" , streamToken )
return jsonerror . InternalServerError ( )
}
}
}
2020-01-23 17:51:10 +00:00
// Pagination tokens. To is optional, and its default value depends on the
// direction ("b" or "f").
2020-05-13 11:14:50 +00:00
var to types . TopologyToken
2020-01-23 17:51:10 +00:00
wasToProvided := true
2022-03-18 10:40:01 +00:00
if len ( toQuery ) > 0 {
to , err = types . NewTopologyTokenFromString ( toQuery )
2020-01-23 17:51:10 +00:00
if err != nil {
2022-03-18 10:40:01 +00:00
var streamToken types . StreamingToken
if streamToken , err = types . NewStreamTokenFromString ( toQuery ) ; err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . InvalidArgumentValue ( "Invalid to parameter: " + err . Error ( ) ) ,
}
} else {
2022-09-30 11:48:10 +00:00
to , err = snapshot . StreamToTopologicalPosition ( req . Context ( ) , roomID , streamToken . PDUPosition , ! backwardOrdering )
2022-03-18 10:40:01 +00:00
if err != nil {
logrus . WithError ( err ) . Errorf ( "Failed to get topological position for streaming token %v" , streamToken )
return jsonerror . InternalServerError ( )
}
2020-01-23 17:51:10 +00:00
}
}
} else {
// If "to" isn't provided, it defaults to either the earliest stream
// position (if we're going backward) or to the latest one (if we're
// going forward).
2022-09-30 11:48:10 +00:00
to , err = setToDefault ( req . Context ( ) , snapshot , backwardOrdering , roomID )
2020-01-23 17:51:10 +00:00
if err != nil {
2020-03-02 16:20:44 +00:00
util . GetLogger ( req . Context ( ) ) . WithError ( err ) . Error ( "setToDefault failed" )
return jsonerror . InternalServerError ( )
2020-01-23 17:51:10 +00:00
}
wasToProvided = false
}
// TODO: Implement filtering (#587)
// Check the room ID's format.
if _ , _ , err = gomatrixserverlib . SplitID ( '!' , roomID ) ; err != nil {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . MissingArgument ( "Bad room ID: " + err . Error ( ) ) ,
}
}
2022-10-24 15:03:04 +00:00
// If the user already left the room, grep events from before that
if membershipResp . Membership == gomatrixserverlib . Leave {
var token types . TopologyToken
token , err = snapshot . EventPositionInTopology ( req . Context ( ) , membershipResp . EventID )
if err != nil {
return util . JSONResponse {
Code : http . StatusInternalServerError ,
}
}
if backwardOrdering {
from = token
}
}
2020-01-23 17:51:10 +00:00
mReq := messagesReq {
ctx : req . Context ( ) ,
db : db ,
2022-09-30 11:48:10 +00:00
snapshot : snapshot ,
2020-05-01 09:48:17 +00:00
rsAPI : rsAPI ,
2020-01-23 17:51:10 +00:00
cfg : cfg ,
roomID : roomID ,
2020-05-13 11:14:50 +00:00
from : & from ,
to : & to ,
2020-01-23 17:51:10 +00:00
wasToProvided : wasToProvided ,
2022-03-01 14:39:56 +00:00
filter : filter ,
2020-01-23 17:51:10 +00:00
backwardOrdering : backwardOrdering ,
2020-10-02 16:08:13 +00:00
device : device ,
2020-01-23 17:51:10 +00:00
}
clientEvents , start , end , err := mReq . retrieveEvents ( )
if err != nil {
2020-03-02 16:20:44 +00:00
util . GetLogger ( req . Context ( ) ) . WithError ( err ) . Error ( "mreq.retrieveEvents failed" )
return jsonerror . InternalServerError ( )
2020-01-23 17:51:10 +00:00
}
2020-06-17 16:41:45 +00:00
2020-03-24 12:20:10 +00:00
util . GetLogger ( req . Context ( ) ) . WithFields ( logrus . Fields {
"from" : from . String ( ) ,
"to" : to . String ( ) ,
2022-03-01 14:39:56 +00:00
"limit" : filter . Limit ,
2020-03-24 12:20:10 +00:00
"backwards" : backwardOrdering ,
"return_start" : start . String ( ) ,
"return_end" : end . String ( ) ,
} ) . Info ( "Responding" )
2020-01-23 17:51:10 +00:00
2020-12-16 18:10:39 +00:00
res := messagesResp {
Chunk : clientEvents ,
Start : start . String ( ) ,
End : end . String ( ) ,
2022-06-29 08:49:12 +00:00
}
2022-09-30 11:48:10 +00:00
res . applyLazyLoadMembers ( req . Context ( ) , snapshot , roomID , device , filter . LazyLoadMembers , lazyLoadCache )
2022-06-29 08:49:12 +00:00
// If we didn't return any events, set the end to an empty string, so it will be omitted
// in the response JSON.
if len ( res . Chunk ) == 0 {
res . End = ""
2020-12-16 18:10:39 +00:00
}
2022-03-18 10:40:01 +00:00
if fromStream != nil {
2020-12-16 18:10:39 +00:00
res . StartStream = fromStream . String ( )
}
2020-01-23 17:51:10 +00:00
// Respond with the events.
2022-09-30 11:48:10 +00:00
succeeded = true
2020-01-23 17:51:10 +00:00
return util . JSONResponse {
Code : http . StatusOK ,
2020-12-16 18:10:39 +00:00
JSON : res ,
2020-01-23 17:51:10 +00:00
}
}
2022-06-29 08:49:12 +00:00
// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has
// LazyLoadMembers enabled.
func ( m * messagesResp ) applyLazyLoadMembers (
ctx context . Context ,
2022-09-30 11:48:10 +00:00
db storage . DatabaseTransaction ,
2022-06-29 08:49:12 +00:00
roomID string ,
device * userapi . Device ,
lazyLoad bool ,
lazyLoadCache caching . LazyLoadCache ,
) {
if ! lazyLoad {
return
}
membershipToUser := make ( map [ string ] * gomatrixserverlib . HeaderedEvent )
for _ , evt := range m . Chunk {
// Don't add membership events the client should already know about
if _ , cached := lazyLoadCache . IsLazyLoadedUserCached ( device , roomID , evt . Sender ) ; cached {
continue
}
membership , err := db . GetStateEvent ( ctx , roomID , gomatrixserverlib . MRoomMember , evt . Sender )
if err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "failed to get membership event for user" )
continue
}
if membership != nil {
membershipToUser [ evt . Sender ] = membership
lazyLoadCache . StoreLazyLoadedUser ( device , roomID , evt . Sender , membership . EventID ( ) )
}
}
for _ , evt := range membershipToUser {
2022-08-05 12:27:27 +00:00
m . State = append ( m . State , gomatrixserverlib . HeaderedToClientEvent ( evt , gomatrixserverlib . FormatAll ) )
2022-06-29 08:49:12 +00:00
}
}
2022-10-24 15:03:04 +00:00
func getMembershipForUser ( ctx context . Context , roomID , userID string , rsAPI api . SyncRoomserverAPI ) ( resp api . QueryMembershipForUserResponse , err error ) {
2020-11-05 10:19:23 +00:00
req := api . QueryMembershipForUserRequest {
RoomID : roomID ,
UserID : userID ,
}
if err := rsAPI . QueryMembershipForUser ( ctx , & req , & resp ) ; err != nil {
2022-10-24 15:03:04 +00:00
return api . QueryMembershipForUserResponse { } , err
2020-11-05 10:19:23 +00:00
}
2022-10-24 15:03:04 +00:00
return resp , nil
2020-11-05 10:19:23 +00:00
}
2020-09-27 21:23:42 +00:00
// retrieveEvents retrieves events from the local database for a request on
2020-01-23 17:51:10 +00:00
// /messages. If there's not enough events to retrieve, it asks another
// homeserver in the room for older events.
// Returns an error if there was an issue talking to the database or with the
// remote homeserver.
func ( r * messagesReq ) retrieveEvents ( ) (
clientEvents [ ] gomatrixserverlib . ClientEvent , start ,
2020-05-13 11:14:50 +00:00
end types . TopologyToken , err error ,
2020-01-23 17:51:10 +00:00
) {
// Retrieve the events from the local database.
2022-09-30 11:48:10 +00:00
streamEvents , err := r . snapshot . GetEventsInTopologicalRange ( r . ctx , r . from , r . to , r . roomID , r . filter , r . backwardOrdering )
2020-01-23 17:51:10 +00:00
if err != nil {
2020-03-18 12:48:51 +00:00
err = fmt . Errorf ( "GetEventsInRange: %w" , err )
2020-01-23 17:51:10 +00:00
return
}
2020-11-16 15:44:53 +00:00
var events [ ] * gomatrixserverlib . HeaderedEvent
2022-10-24 15:03:04 +00:00
util . GetLogger ( r . ctx ) . WithFields ( logrus . Fields {
"start" : r . from ,
"end" : r . to ,
"backwards" : r . backwardOrdering ,
} ) . Infof ( "Fetched %d events locally" , len ( streamEvents ) )
2020-01-23 17:51:10 +00:00
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
// on the ordering), or we've reached a backward extremity.
if len ( streamEvents ) == 0 {
if events , err = r . handleEmptyEventsSlice ( ) ; err != nil {
return
}
} else {
if events , err = r . handleNonEmptyEventsSlice ( streamEvents ) ; err != nil {
return
}
}
// If we didn't get any event, we don't need to proceed any further.
if len ( events ) == 0 {
2020-05-13 11:14:50 +00:00
return [ ] gomatrixserverlib . ClientEvent { } , * r . from , * r . to , nil
2020-01-23 17:51:10 +00:00
}
2021-01-13 12:59:29 +00:00
// Get the position of the first and the last event in the room's topology.
// This position is currently determined by the event's depth, so we could
// also use it instead of retrieving from the database. However, if we ever
// change the way topological positions are defined (as depth isn't the most
// reliable way to define it), it would be easier and less troublesome to
// only have to change it in one place, i.e. the database.
start , end , err = r . getStartEnd ( events )
2022-08-11 16:23:35 +00:00
if err != nil {
return [ ] gomatrixserverlib . ClientEvent { } , * r . from , * r . to , err
}
2021-01-13 12:59:29 +00:00
2020-04-15 15:10:18 +00:00
// Sort the events to ensure we send them in the right order.
2020-01-23 17:51:10 +00:00
if r . backwardOrdering {
2020-04-15 15:10:18 +00:00
// This reverses the array from old->new to new->old
2020-11-16 15:44:53 +00:00
reversed := func ( in [ ] * gomatrixserverlib . HeaderedEvent ) [ ] * gomatrixserverlib . HeaderedEvent {
out := make ( [ ] * gomatrixserverlib . HeaderedEvent , len ( in ) )
2020-05-01 15:41:13 +00:00
for i := 0 ; i < len ( in ) ; i ++ {
out [ i ] = in [ len ( in ) - i - 1 ]
}
return out
}
events = reversed ( events )
2020-01-23 17:51:10 +00:00
}
2020-10-02 16:08:13 +00:00
if len ( events ) == 0 {
return [ ] gomatrixserverlib . ClientEvent { } , * r . from , * r . to , nil
}
2020-01-23 17:51:10 +00:00
2022-08-11 16:23:35 +00:00
// Apply room history visibility filter
startTime := time . Now ( )
2022-09-30 11:48:10 +00:00
filteredEvents , err := internal . ApplyHistoryVisibilityFilter ( r . ctx , r . snapshot , r . rsAPI , events , nil , r . device . UserID , "messages" )
2022-08-11 16:23:35 +00:00
logrus . WithFields ( logrus . Fields {
2022-08-19 09:04:26 +00:00
"duration" : time . Since ( startTime ) ,
"room_id" : r . roomID ,
"events_before" : len ( events ) ,
"events_after" : len ( filteredEvents ) ,
2022-08-11 16:23:35 +00:00
} ) . Debug ( "applied history visibility (messages)" )
return gomatrixserverlib . HeaderedToClientEvents ( filteredEvents , gomatrixserverlib . FormatAll ) , start , end , err
2020-10-02 16:08:13 +00:00
}
2020-11-16 15:44:53 +00:00
func ( r * messagesReq ) getStartEnd ( events [ ] * gomatrixserverlib . HeaderedEvent ) ( start , end types . TopologyToken , err error ) {
2021-01-13 12:59:29 +00:00
if r . backwardOrdering {
start = * r . from
if events [ len ( events ) - 1 ] . Type ( ) == gomatrixserverlib . MRoomCreate {
// NOTSPEC: We've hit the beginning of the room so there's really nowhere
2021-07-20 08:45:40 +00:00
// else to go. This seems to fix Element iOS from looping on /messages endlessly.
2021-01-13 12:59:29 +00:00
end = types . TopologyToken { }
} else {
2022-09-30 11:48:10 +00:00
end , err = r . snapshot . EventPositionInTopology (
2021-01-13 12:59:29 +00:00
r . ctx , events [ 0 ] . EventID ( ) ,
)
2020-06-17 16:41:45 +00:00
// A stream/topological position is a cursor located between two events.
// While they are identified in the code by the event on their right (if
// we consider a left to right chronological order), tokens need to refer
// to them by the event on their left, therefore we need to decrement the
// end position we send in the response if we're going backward.
end . Decrement ( )
}
2021-01-13 12:59:29 +00:00
} else {
start = * r . from
2022-09-30 11:48:10 +00:00
end , err = r . snapshot . EventPositionInTopology (
2021-01-13 12:59:29 +00:00
r . ctx , events [ len ( events ) - 1 ] . EventID ( ) ,
)
}
if err != nil {
err = fmt . Errorf ( "EventPositionInTopology: for end event %s: %w" , events [ len ( events ) - 1 ] . EventID ( ) , err )
return
2020-01-23 17:51:10 +00:00
}
2020-06-17 16:41:45 +00:00
return
2020-01-23 17:51:10 +00:00
}
// handleEmptyEventsSlice handles the case where the initial request to the
// database returned an empty slice of events. It does so by checking whether
// the set is empty because we've reached a backward extremity, and if that is
// the case, by retrieving as much events as requested by backfilling from
// another homeserver.
// Returns an error if there was an issue talking with the database or
// backfilling.
func ( r * messagesReq ) handleEmptyEventsSlice ( ) (
2020-11-16 15:44:53 +00:00
events [ ] * gomatrixserverlib . HeaderedEvent , err error ,
2020-01-23 17:51:10 +00:00
) {
2022-09-30 11:48:10 +00:00
backwardExtremities , err := r . snapshot . BackwardExtremitiesForRoom ( r . ctx , r . roomID )
2020-01-23 17:51:10 +00:00
// Check if we have backward extremities for this room.
if len ( backwardExtremities ) > 0 {
// If so, retrieve as much events as needed through backfilling.
2022-03-01 14:39:56 +00:00
events , err = r . backfill ( r . roomID , backwardExtremities , r . filter . Limit )
2020-01-23 17:51:10 +00:00
if err != nil {
return
}
} else {
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
2020-11-16 15:44:53 +00:00
events = [ ] * gomatrixserverlib . HeaderedEvent { }
2020-01-23 17:51:10 +00:00
}
return
}
// handleNonEmptyEventsSlice handles the case where the initial request to the
// database returned a non-empty slice of events. It does so by checking whether
// events are missing from the expected result, and retrieve missing events
// through backfilling if needed.
// Returns an error if there was an issue while backfilling.
func ( r * messagesReq ) handleNonEmptyEventsSlice ( streamEvents [ ] types . StreamEvent ) (
2020-11-16 15:44:53 +00:00
events [ ] * gomatrixserverlib . HeaderedEvent , err error ,
2020-01-23 17:51:10 +00:00
) {
// Check if we have enough events.
2022-03-01 14:39:56 +00:00
isSetLargeEnough := len ( streamEvents ) >= r . filter . Limit
2020-03-24 12:20:10 +00:00
if ! isSetLargeEnough {
// it might be fine we don't have up to 'limit' events, let's find out
2020-01-23 17:51:10 +00:00
if r . backwardOrdering {
if r . wasToProvided {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
streamPos := types . StreamPosition ( streamEvents [ len ( streamEvents ) - 1 ] . StreamPosition )
2020-12-10 18:57:10 +00:00
isSetLargeEnough = ( r . to . PDUPosition - 1 == streamPos )
2020-01-23 17:51:10 +00:00
}
} else {
streamPos := types . StreamPosition ( streamEvents [ 0 ] . StreamPosition )
2020-12-10 18:57:10 +00:00
isSetLargeEnough = ( r . from . PDUPosition - 1 == streamPos )
2020-01-23 17:51:10 +00:00
}
}
// Check if the slice contains a backward extremity.
2022-09-30 11:48:10 +00:00
backwardExtremities , err := r . snapshot . BackwardExtremitiesForRoom ( r . ctx , r . roomID )
2020-01-23 17:51:10 +00:00
if err != nil {
return
}
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if len ( backwardExtremities ) > 0 && ! isSetLargeEnough && r . backwardOrdering {
2020-11-16 15:44:53 +00:00
var pdus [ ] * gomatrixserverlib . HeaderedEvent
2020-01-23 17:51:10 +00:00
// Only ask the remote server for enough events to reach the limit.
2022-03-01 14:39:56 +00:00
pdus , err = r . backfill ( r . roomID , backwardExtremities , r . filter . Limit - len ( streamEvents ) )
2020-01-23 17:51:10 +00:00
if err != nil {
return
}
// Append the PDUs to the list to send back to the client.
events = append ( events , pdus ... )
}
// Append the events ve previously retrieved locally.
2022-09-30 11:48:10 +00:00
events = append ( events , r . snapshot . StreamEventsToEvents ( nil , streamEvents ) ... )
2020-05-01 15:41:13 +00:00
sort . Sort ( eventsByDepth ( events ) )
2020-01-23 17:51:10 +00:00
return
}
2020-11-16 15:44:53 +00:00
type eventsByDepth [ ] * gomatrixserverlib . HeaderedEvent
2020-05-01 15:41:13 +00:00
func ( e eventsByDepth ) Len ( ) int {
return len ( e )
}
func ( e eventsByDepth ) Swap ( i , j int ) {
e [ i ] , e [ j ] = e [ j ] , e [ i ]
}
func ( e eventsByDepth ) Less ( i , j int ) bool {
return e [ i ] . Depth ( ) < e [ j ] . Depth ( )
}
2020-01-23 17:51:10 +00:00
// backfill performs a backfill request over the federation on another
// homeserver in the room.
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
// It also stores the PDUs retrieved from the remote homeserver's response to
// the database.
// Returns with an empty string if the remote homeserver didn't return with any
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
2020-11-16 15:44:53 +00:00
func ( r * messagesReq ) backfill ( roomID string , backwardsExtremities map [ string ] [ ] string , limit int ) ( [ ] * gomatrixserverlib . HeaderedEvent , error ) {
2020-06-11 18:50:40 +00:00
var res api . PerformBackfillResponse
err := r . rsAPI . PerformBackfill ( context . Background ( ) , & api . PerformBackfillRequest {
2020-05-20 15:04:31 +00:00
RoomID : roomID ,
BackwardsExtremities : backwardsExtremities ,
Limit : limit ,
ServerName : r . cfg . Matrix . ServerName ,
2022-11-15 15:05:23 +00:00
VirtualHost : r . device . UserDomain ( ) ,
2020-04-28 10:46:47 +00:00
} , & res )
2020-03-24 12:20:10 +00:00
if err != nil {
2020-06-11 18:50:40 +00:00
return nil , fmt . Errorf ( "PerformBackfill failed: %w" , err )
2020-03-24 12:20:10 +00:00
}
2020-04-28 10:46:47 +00:00
util . GetLogger ( r . ctx ) . WithField ( "new_events" , len ( res . Events ) ) . Info ( "Storing new events from backfill" )
2020-03-24 12:20:10 +00:00
2020-04-28 10:46:47 +00:00
// TODO: we should only be inserting events into the database from the roomserver's kafka output stream.
// Currently, this can race with live events for the room and cause problems. It's also just a bit unclear
// when you have multiple entry points to write events.
2020-03-24 12:20:10 +00:00
2020-05-01 15:41:13 +00:00
// we have to order these by depth, starting with the lowest because otherwise the topology tokens
// will skip over events that have the same depth but different stream positions due to the query which is:
// - anything less than the depth OR
// - anything with the same depth and a lower stream position.
sort . Sort ( eventsByDepth ( res . Events ) )
2020-03-24 12:20:10 +00:00
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
2022-08-19 09:04:26 +00:00
if res . HistoryVisibility == "" {
res . HistoryVisibility = gomatrixserverlib . HistoryVisibilityShared
}
2020-04-28 10:46:47 +00:00
for i := range res . Events {
2020-05-01 15:41:13 +00:00
_ , err = r . db . WriteEvent (
2020-10-05 10:06:31 +00:00
context . Background ( ) ,
2020-11-16 15:44:53 +00:00
res . Events [ i ] ,
[ ] * gomatrixserverlib . HeaderedEvent { } ,
2020-03-24 12:20:10 +00:00
[ ] string { } ,
[ ] string { } ,
nil , true ,
2022-08-19 09:04:26 +00:00
res . HistoryVisibility ,
2020-05-01 15:41:13 +00:00
)
if err != nil {
2020-03-24 12:20:10 +00:00
return nil , err
}
}
2020-05-19 17:42:55 +00:00
// we may have got more than the requested limit so resize now
events := res . Events
if len ( events ) > limit {
// last `limit` events
events = events [ len ( events ) - limit : ]
}
2022-08-19 09:04:26 +00:00
for _ , ev := range events {
ev . Visibility = res . HistoryVisibility
}
2020-05-19 17:42:55 +00:00
return events , nil
2020-01-23 17:51:10 +00:00
}
// setToDefault returns the default value for the "to" query parameter of a
// request to /messages if not provided. It defaults to either the earliest
// topological position (if we're going backward) or to the latest one (if we're
// going forward).
// Returns an error if there was an issue with retrieving the latest position
// from the database
func setToDefault (
2022-09-30 11:48:10 +00:00
ctx context . Context , snapshot storage . DatabaseTransaction , backwardOrdering bool ,
2020-01-23 17:51:10 +00:00
roomID string ,
2020-05-13 11:14:50 +00:00
) ( to types . TopologyToken , err error ) {
2020-01-23 17:51:10 +00:00
if backwardOrdering {
2020-03-24 12:20:10 +00:00
// go 1 earlier than the first event so we correctly fetch the earliest event
2020-05-14 16:30:16 +00:00
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
2020-12-10 18:57:10 +00:00
to = types . TopologyToken { }
2020-01-23 17:51:10 +00:00
} else {
2022-09-30 11:48:10 +00:00
to , err = snapshot . MaxTopologicalPosition ( ctx , roomID )
2020-01-23 17:51:10 +00:00
}
return
}