Implement /context (#2207)

* Add QueryEventsAfter

* Add /context

* Make all tests pass on sqlite

* Add queries to get the events for /context requests

* Move /context to the syncapi

* Revert "Add QueryEventsAfter"

This reverts commit 440a771d10.

* Simplify getting the required events

* Apply RoomEventFilter when getting events

* Add passing tests

* Remove logging

* Remove unused SQL statements
Update comments & add TODO
This commit is contained in:
S7evinK 2022-02-21 17:12:22 +01:00 committed by GitHub
parent 280e9b19a1
commit cf525d1f61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 533 additions and 60 deletions

190
syncapi/routing/context.go Normal file
View file

@ -0,0 +1,190 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"database/sql"
"encoding/json"
"net/http"
"strconv"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
type ContextRespsonse struct {
End string `json:"end"`
Event gomatrixserverlib.ClientEvent `json:"event"`
EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"`
EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"`
Start string `json:"start"`
State []gomatrixserverlib.ClientEvent `json:"state"`
}
func Context(
req *http.Request, device *userapi.Device,
rsAPI roomserver.RoomserverInternalAPI,
syncDB storage.Database,
roomID, eventID string,
) util.JSONResponse {
filter, err := parseContextParams(req)
if err != nil {
errMsg := ""
switch err.(type) {
case *json.InvalidUnmarshalError:
errMsg = "unable to parse filter"
case *strconv.NumError:
errMsg = "unable to parse limit"
}
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam(errMsg),
Headers: nil,
}
}
filter.Rooms = append(filter.Rooms, roomID)
ctx := req.Context()
membershipRes := roomserver.QueryMembershipForUserResponse{}
membershipReq := roomserver.QueryMembershipForUserRequest{UserID: device.UserID, RoomID: roomID}
if err := rsAPI.QueryMembershipForUser(ctx, &membershipReq, &membershipRes); err != nil {
logrus.WithError(err).Error("unable to fo membership")
return jsonerror.InternalServerError()
}
stateFilter := gomatrixserverlib.StateFilter{
Limit: 100,
NotSenders: filter.NotSenders,
NotTypes: filter.NotTypes,
Senders: filter.Senders,
Types: filter.Types,
LazyLoadMembers: filter.LazyLoadMembers,
IncludeRedundantMembers: filter.IncludeRedundantMembers,
NotRooms: filter.NotRooms,
Rooms: filter.Rooms,
ContainsURL: filter.ContainsURL,
}
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
// verify the user is allowed to see the context for this room/event
for _, x := range state {
hisVis, err := x.HistoryVisibility()
if err != nil {
continue
}
allowed := hisVis != "world_readable" && membershipRes.Membership == "join"
if !allowed {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("User is not allowed to query context"),
}
}
}
id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID)
if err != nil {
logrus.WithError(err).WithField("eventID", eventID).Error("unable to find requested event")
return jsonerror.InternalServerError()
}
eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch before events")
return jsonerror.InternalServerError()
}
_, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch after events")
return jsonerror.InternalServerError()
}
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll)
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll)
newState := applyLazyLoadMembers(filter, eventsAfterClient, eventsBeforeClient, state)
response := ContextRespsonse{
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient,
State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll),
}
if len(response.State) > filter.Limit {
response.State = response.State[len(response.State)-filter.Limit:]
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: response,
}
}
func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent, state []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
if filter == nil || !filter.LazyLoadMembers {
return state
}
allEvents := append(eventsBefore, eventsAfter...)
x := make(map[string]bool)
// get members who actually send an event
for _, e := range allEvents {
x[e.Sender] = true
}
newState := []*gomatrixserverlib.HeaderedEvent{}
for _, event := range state {
if event.Type() != gomatrixserverlib.MRoomMember {
newState = append(newState, event)
} else {
// did the user send an event?
if x[event.Sender()] {
newState = append(newState, event)
}
}
}
return newState
}
func parseContextParams(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
// Default room filter
filter := &gomatrixserverlib.RoomEventFilter{Limit: 10}
l := req.URL.Query().Get("limit")
f := req.URL.Query().Get("filter")
if l != "" {
limit, err := strconv.Atoi(l)
if err != nil {
return nil, err
}
// NOTSPEC: feels like a good idea to have an upper bound limit
if limit > 100 {
limit = 100
}
filter.Limit = limit
}
if f != "" {
if err := json.Unmarshal([]byte(f), &filter); err != nil {
return nil, err
}
}
return filter, nil
}

View file

@ -0,0 +1,68 @@
package routing
import (
"net/http"
"reflect"
"testing"
"github.com/matrix-org/gomatrixserverlib"
)
func Test_parseContextParams(t *testing.T) {
noParamsReq, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI", nil)
limit2Req, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI&limit=2", nil)
limit10000Req, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI&limit=10000", nil)
invalidLimitReq, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI&limit=100as", nil)
lazyLoadReq, _ := http.NewRequest("GET", "https://localhost:8800//_matrix/client/r0/rooms/!kvEtX3rFamfwKHO3:localhost:8800/context/%24GjmkRbajRHy8_cxcSbUU4qF_njV8yHeLphI2azTrPaI?limit=2&filter=%7B+%22lazy_load_members%22+%3A+true+%7D&access_token=t1Njzm74w3G40CJ5xrlf1V2haXom0z0Iq1qyyVWhbVo", nil)
invalidFilterReq, _ := http.NewRequest("GET", "https://localhost:8800//_matrix/client/r0/rooms/!kvEtX3rFamfwKHO3:localhost:8800/context/%24GjmkRbajRHy8_cxcSbUU4qF_njV8yHeLphI2azTrPaI?limit=2&filter=%7B+%22lazy_load_members%22+%3A+true&access_token=t1Njzm74w3G40CJ5xrlf1V2haXom0z0Iq1qyyVWhbVo", nil)
tests := []struct {
name string
req *http.Request
wantFilter *gomatrixserverlib.RoomEventFilter
wantErr bool
}{
{
name: "no params set",
req: noParamsReq,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 10},
},
{
name: "limit 2 param set",
req: limit2Req,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 2},
},
{
name: "limit 10000 param set",
req: limit10000Req,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 100},
},
{
name: "filter lazy_load_members param set",
req: lazyLoadReq,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 2, LazyLoadMembers: true},
},
{
name: "invalid limit req",
req: invalidLimitReq,
wantErr: true,
},
{
name: "invalid filter req",
req: invalidFilterReq,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFilter, err := parseContextParams(tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("parseContextParams() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotFilter, tt.wantFilter) {
t.Errorf("parseContextParams() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
}
})
}
}

View file

@ -77,4 +77,19 @@ func Setup(
v3mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { v3mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return srp.OnIncomingKeyChangeRequest(req, device) return srp.OnIncomingKeyChangeRequest(req, device)
})).Methods(http.MethodGet, http.MethodOptions) })).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/rooms/{roomId}/context/{eventId}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return Context(
req, device,
rsAPI, syncDB,
vars["roomId"], vars["eventId"],
)
}),
).Methods(http.MethodGet, http.MethodOptions)
} }

View file

@ -137,4 +137,8 @@ type Database interface {
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
} }

View file

@ -130,6 +130,25 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectContextEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
const selectContextBeforeEventSQL = "" +
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $3"
const selectContextAfterEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
@ -140,6 +159,9 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -148,34 +170,20 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertEventStmt, insertEventSQL},
} {&s.selectEventsStmt, selectEventsSQL},
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { {&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
return nil, err {&s.selectRecentEventsStmt, selectRecentEventsSQL},
} {&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { {&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
return nil, err {&s.selectStateInRangeStmt, selectStateInRangeSQL},
} {&s.updateEventJSONStmt, updateEventJSONSQL},
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { {&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
return nil, err {&s.selectContextEventStmt, selectContextEventSQL},
} {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
return nil, err }.Prepare(db)
}
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
return nil, err
}
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
@ -436,6 +444,84 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err return err
} }
func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
if err = row.Scan(&id, &eventAsString); err != nil {
return 0, evt, err
}
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
return 0, evt, err
}
return id, evt, nil
}
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
pq.StringArray(filter.Senders),
pq.StringArray(filter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&eventBytes); err != nil {
return evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return evts, err
}
evts = append(evts, evt)
}
return evts, rows.Err()
}
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
pq.StringArray(filter.Senders),
pq.StringArray(filter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&lastID, &eventBytes); err != nil {
return 0, evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return 0, evts, err
}
evts = append(evts, evt)
}
return lastID, evts, rows.Err()
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent var result []types.StreamEvent
for rows.Next() { for rows.Next() {

View file

@ -955,3 +955,14 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream
_, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
return receipts, err return receipts, err
} }
func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
}
func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
}
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}

View file

@ -68,7 +68,8 @@ const selectRoomIDsWithMembershipSQL = "" +
const selectCurrentStateSQL = "" + const selectCurrentStateSQL = "" +
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
const selectJoinedUsersSQL = "" + const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"

View file

@ -62,17 +62,17 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" " WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" + const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" + const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" " WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -85,19 +85,33 @@ const selectStateInRangeSQL = "" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" + " WHERE (id > $1 AND id <= $2)" +
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectContextEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
const selectContextBeforeEventSQL = "" +
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectContextAfterEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *streamIDStatements streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -109,22 +123,16 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertEventStmt, insertEventSQL},
} {&s.selectEventsStmt, selectEventsSQL},
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { {&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
return nil, err {&s.updateEventJSONStmt, updateEventJSONSQL},
} {&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { {&s.selectContextEventStmt, selectContextEventSQL},
return nil, err {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
} {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil { }.Prepare(db)
return nil, err
}
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
@ -462,6 +470,91 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
} }
return result, nil return result, nil
} }
func (s *outputRoomEventsStatements) SelectContextEvent(
ctx context.Context, txn *sql.Tx, roomID, eventID string,
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
if err = row.Scan(&id, &eventAsString); err != nil {
return 0, evt, err
}
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
return 0, evt, err
}
return id, evt, nil
}
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextBeforeEventSQL,
[]interface{}{
roomID, id,
},
filter.Senders, filter.NotSenders,
filter.Types, filter.NotTypes,
nil, filter.Limit, FilterOrderDesc,
)
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&eventBytes); err != nil {
return evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return evts, err
}
evts = append(evts, evt)
}
return evts, rows.Err()
}
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextAfterEventSQL,
[]interface{}{
roomID, id,
},
filter.Senders, filter.NotSenders,
filter.Types, filter.NotTypes,
nil, filter.Limit, FilterOrderAsc,
)
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&lastID, &eventBytes); err != nil {
return 0, evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return 0, evts, err
}
evts = append(evts, evt)
}
return lastID, evts, rows.Err()
}
func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) { func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) {
if len(addIDsJSON) > 0 { if len(addIDsJSON) > 0 {

View file

@ -63,6 +63,10 @@ type Events interface {
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
} }
// Topology keeps track of the depths and stream positions for all events. // Topology keeps track of the depths and stream positions for all events.

View file

@ -24,11 +24,8 @@ Local device key changes get to remote servers with correct prev_id
# Flakey # Flakey
Local device key changes appear in /keys/changes Local device key changes appear in /keys/changes
Device list doesn't change if remote server is down
# we don't support groups # we don't support groups
Remove group category Remove group category
Remove group role Remove group role
# See https://github.com/matrix-org/sytest/pull/1142
Device list doesn't change if remote server is down

View file

@ -592,4 +592,8 @@ Forward extremities remain so even after the next events are populated as outlie
If a device list update goes missing, the server resyncs on the next one If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation uploading self-signing key notifies over federation
uploading signed devices gets propagated over federation uploading signed devices gets propagated over federation
Device list doesn't change if remote server is down Device list doesn't change if remote server is down
/context/ on joined room works
/context/ on non world readable room does not work
/context/ returns correct number of events
/context/ with lazy_load_members filter works