Hit the database far less in Events to find room NIDs and room versions (#1643)

* Hit the database far less to find room NIDs for event NIDs

* Close the rows

* Fix SQLite selectRoomNIDsForEventNIDsSQL

* Give same treatment to room version lookups
This commit is contained in:
Neil Alexander 2020-12-16 10:33:28 +00:00 committed by GitHub
parent d2bcc5f746
commit 9057143033
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 65 deletions

View file

@ -120,8 +120,8 @@ const bulkSelectEventNIDSQL = "" +
const selectMaxEventDepthSQL = "" + const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)" "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
const selectRoomNIDForEventNIDSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT room_nid FROM roomserver_events WHERE event_nid = $1" "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)"
type eventStatements struct { type eventStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
@ -137,7 +137,7 @@ type eventStatements struct {
bulkSelectEventIDStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt
selectMaxEventDepthStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt
selectRoomNIDForEventNIDStmt *sql.Stmt selectRoomNIDsForEventNIDsStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -161,7 +161,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
{&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL},
{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -432,11 +432,24 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx,
return result, nil return result, nil
} }
func (s *eventStatements) SelectRoomNIDForEventNID( func (s *eventStatements) SelectRoomNIDsForEventNIDs(
ctx context.Context, eventNID types.EventNID, ctx context.Context, eventNIDs []types.EventNID,
) (roomNID types.RoomNID, err error) { ) (map[types.EventNID]types.RoomNID, error) {
err = s.selectRoomNIDForEventNIDStmt.QueryRowContext(ctx, int64(eventNID)).Scan(&roomNID) rows, err := s.selectRoomNIDsForEventNIDsStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs))
return if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
result := make(map[types.EventNID]types.RoomNID)
for rows.Next() {
var eventNID types.EventNID
var roomNID types.RoomNID
if err = rows.Scan(&eventNID, &roomNID); err != nil {
return nil, err
}
result[eventNID] = roomNID
}
return result, nil
} }
func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {

View file

@ -18,7 +18,6 @@ package postgres
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -69,8 +68,8 @@ const selectLatestEventNIDsForUpdateSQL = "" +
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
"UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1" "UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1"
const selectRoomVersionForRoomNIDSQL = "" + const selectRoomVersionsForRoomNIDsSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" "SELECT room_nid, room_version FROM roomserver_rooms WHERE room_nid = ANY($1)"
const selectRoomInfoSQL = "" + const selectRoomInfoSQL = "" +
"SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1" "SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1"
@ -90,7 +89,7 @@ type roomStatements struct {
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
selectRoomVersionForRoomNIDStmt *sql.Stmt selectRoomVersionsForRoomNIDsStmt *sql.Stmt
selectRoomInfoStmt *sql.Stmt selectRoomInfoStmt *sql.Stmt
selectRoomIDsStmt *sql.Stmt selectRoomIDsStmt *sql.Stmt
bulkSelectRoomIDsStmt *sql.Stmt bulkSelectRoomIDsStmt *sql.Stmt
@ -109,7 +108,7 @@ func NewPostgresRoomsTable(db *sql.DB) (tables.Rooms, error) {
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
{&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, {&s.selectRoomVersionsForRoomNIDsStmt, selectRoomVersionsForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL}, {&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL}, {&s.selectRoomIDsStmt, selectRoomIDsSQL},
{&s.bulkSelectRoomIDsStmt, bulkSelectRoomIDsSQL}, {&s.bulkSelectRoomIDsStmt, bulkSelectRoomIDsSQL},
@ -219,15 +218,24 @@ func (s *roomStatements) UpdateLatestEventNIDs(
return err return err
} }
func (s *roomStatements) SelectRoomVersionForRoomNID( func (s *roomStatements) SelectRoomVersionsForRoomNIDs(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNIDs []types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (map[types.RoomNID]gomatrixserverlib.RoomVersion, error) {
var roomVersion gomatrixserverlib.RoomVersion rows, err := s.selectRoomVersionsForRoomNIDsStmt.QueryContext(ctx, roomNIDsAsArray(roomNIDs))
err := s.selectRoomVersionForRoomNIDStmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion) if err != nil {
if err == sql.ErrNoRows { return nil, err
return roomVersion, errors.New("room not found")
} }
return roomVersion, err defer internal.CloseAndLogIfError(ctx, rows, "selectRoomVersionsForRoomNIDsStmt: rows.close() failed")
result := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
for rows.Next() {
var roomNID types.RoomNID
var roomVersion gomatrixserverlib.RoomVersion
if err = rows.Scan(&roomNID, &roomVersion); err != nil {
return nil, err
}
result[roomNID] = roomVersion
}
return result, nil
} }
func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) { func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) {
@ -271,3 +279,11 @@ func (s *roomStatements) BulkSelectRoomNIDs(ctx context.Context, roomIDs []strin
} }
return roomNIDs, nil return roomNIDs, nil
} }
func roomNIDsAsArray(roomNIDs []types.RoomNID) pq.Int64Array {
nids := make([]int64, len(roomNIDs))
for i := range roomNIDs {
nids[i] = int64(roomNIDs[i])
}
return nids
}

View file

@ -313,25 +313,29 @@ func (d *Database) Events(
if err != nil { if err != nil {
eventIDs = map[types.EventNID]string{} eventIDs = map[types.EventNID]string{}
} }
var roomNIDs map[types.EventNID]types.RoomNID
roomNIDs, err = d.EventsTable.SelectRoomNIDsForEventNIDs(ctx, eventNIDs)
if err != nil {
return nil, err
}
uniqueRoomNIDs := make(map[types.RoomNID]struct{})
for _, n := range roomNIDs {
uniqueRoomNIDs[n] = struct{}{}
}
roomNIDList := make([]types.RoomNID, 0, len(uniqueRoomNIDs))
for n := range uniqueRoomNIDs {
roomNIDList = append(roomNIDList, n)
}
roomVersions, err := d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, roomNIDList)
if err != nil {
return nil, err
}
results := make([]types.Event, len(eventJSONs)) results := make([]types.Event, len(eventJSONs))
for i, eventJSON := range eventJSONs { for i, eventJSON := range eventJSONs {
var roomNID types.RoomNID
var roomVersion gomatrixserverlib.RoomVersion
result := &results[i] result := &results[i]
result.EventNID = eventJSON.EventNID result.EventNID = eventJSON.EventNID
roomNID, err = d.EventsTable.SelectRoomNIDForEventNID(ctx, eventJSON.EventNID) roomNID := roomNIDs[result.EventNID]
if err != nil { roomVersion := roomVersions[roomNID]
return nil, err
}
if roomID, ok := d.Cache.GetRoomServerRoomID(roomNID); ok {
roomVersion, _ = d.Cache.GetRoomVersion(roomID)
}
if roomVersion == "" {
roomVersion, err = d.RoomsTable.SelectRoomVersionForRoomNID(ctx, roomNID)
if err != nil {
return nil, err
}
}
result.Event, err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID( result.Event, err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion, eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
) )

View file

@ -95,8 +95,8 @@ const bulkSelectEventNIDSQL = "" +
const selectMaxEventDepthSQL = "" + const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)" "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
const selectRoomNIDForEventNIDSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT room_nid FROM roomserver_events WHERE event_nid = $1" "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
type eventStatements struct { type eventStatements struct {
db *sql.DB db *sql.DB
@ -112,7 +112,7 @@ type eventStatements struct {
bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventReferenceStmt *sql.Stmt
bulkSelectEventIDStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt
selectRoomNIDForEventNIDStmt *sql.Stmt //selectRoomNIDsForEventNIDsStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
@ -137,7 +137,7 @@ func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
{&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL}, {&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL},
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, //{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -480,11 +480,33 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx,
return result, nil return result, nil
} }
func (s *eventStatements) SelectRoomNIDForEventNID( func (s *eventStatements) SelectRoomNIDsForEventNIDs(
ctx context.Context, eventNID types.EventNID, ctx context.Context, eventNIDs []types.EventNID,
) (roomNID types.RoomNID, err error) { ) (map[types.EventNID]types.RoomNID, error) {
err = s.selectRoomNIDForEventNIDStmt.QueryRowContext(ctx, int64(eventNID)).Scan(&roomNID) sqlStr := strings.Replace(selectRoomNIDsForEventNIDsSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1)
return sqlPrep, err := s.db.Prepare(sqlStr)
if err != nil {
return nil, err
}
iEventNIDs := make([]interface{}, len(eventNIDs))
for i, v := range eventNIDs {
iEventNIDs[i] = v
}
rows, err := sqlPrep.QueryContext(ctx, iEventNIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
result := make(map[types.EventNID]types.RoomNID)
for rows.Next() {
var eventNID types.EventNID
var roomNID types.RoomNID
if err = rows.Scan(&eventNID, &roomNID); err != nil {
return nil, err
}
result[eventNID] = roomNID
}
return result, nil
} }
func eventNIDsAsArray(eventNIDs []types.EventNID) string { func eventNIDsAsArray(eventNIDs []types.EventNID) string {

View file

@ -19,7 +19,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strings" "strings"
@ -60,8 +59,8 @@ const selectLatestEventNIDsForUpdateSQL = "" +
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
"UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4" "UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4"
const selectRoomVersionForRoomNIDSQL = "" + const selectRoomVersionsForRoomNIDsSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" "SELECT room_nid, room_version FROM roomserver_rooms WHERE room_nid IN ($1)"
const selectRoomInfoSQL = "" + const selectRoomInfoSQL = "" +
"SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1" "SELECT room_version, room_nid, state_snapshot_nid, latest_event_nids FROM roomserver_rooms WHERE room_id = $1"
@ -82,7 +81,7 @@ type roomStatements struct {
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
selectRoomVersionForRoomNIDStmt *sql.Stmt //selectRoomVersionForRoomNIDStmt *sql.Stmt
selectRoomInfoStmt *sql.Stmt selectRoomInfoStmt *sql.Stmt
selectRoomIDsStmt *sql.Stmt selectRoomIDsStmt *sql.Stmt
} }
@ -101,7 +100,7 @@ func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) {
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
{&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, //{&s.selectRoomVersionForRoomNIDsStmt, selectRoomVersionForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL}, {&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL}, {&s.selectRoomIDsStmt, selectRoomIDsSQL},
}.Prepare(db) }.Prepare(db)
@ -223,15 +222,33 @@ func (s *roomStatements) UpdateLatestEventNIDs(
return err return err
} }
func (s *roomStatements) SelectRoomVersionForRoomNID( func (s *roomStatements) SelectRoomVersionsForRoomNIDs(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNIDs []types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (map[types.RoomNID]gomatrixserverlib.RoomVersion, error) {
var roomVersion gomatrixserverlib.RoomVersion sqlStr := strings.Replace(selectRoomVersionsForRoomNIDsSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1)
err := s.selectRoomVersionForRoomNIDStmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion) sqlPrep, err := s.db.Prepare(sqlStr)
if err == sql.ErrNoRows { if err != nil {
return roomVersion, errors.New("room not found") return nil, err
} }
return roomVersion, err iRoomNIDs := make([]interface{}, len(roomNIDs))
for i, v := range roomNIDs {
iRoomNIDs[i] = v
}
rows, err := sqlPrep.QueryContext(ctx, iRoomNIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomVersionsForRoomNIDsStmt: rows.close() failed")
result := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
for rows.Next() {
var roomNID types.RoomNID
var roomVersion gomatrixserverlib.RoomVersion
if err = rows.Scan(&roomNID, &roomVersion); err != nil {
return nil, err
}
result[roomNID] = roomVersion
}
return result, nil
} }
func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) { func (s *roomStatements) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) {

View file

@ -11,6 +11,7 @@ import (
type EventJSONPair struct { type EventJSONPair struct {
EventNID types.EventNID EventNID types.EventNID
RoomVersion gomatrixserverlib.RoomVersion
EventJSON []byte EventJSON []byte
} }
@ -58,7 +59,7 @@ type Events interface {
// If an event ID is not in the database then it is omitted from the map. // If an event ID is not in the database then it is omitted from the map.
BulkSelectEventNID(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error) BulkSelectEventNID(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
SelectRoomNIDForEventNID(ctx context.Context, eventNID types.EventNID) (roomNID types.RoomNID, err error) SelectRoomNIDsForEventNIDs(ctx context.Context, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
} }
type Rooms interface { type Rooms interface {
@ -67,7 +68,7 @@ type Rooms interface {
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error) SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error) SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
SelectRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error) SelectRoomVersionsForRoomNIDs(ctx context.Context, roomNID []types.RoomNID) (map[types.RoomNID]gomatrixserverlib.RoomVersion, error)
SelectRoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error) SelectRoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
SelectRoomIDs(ctx context.Context) ([]string, error) SelectRoomIDs(ctx context.Context) ([]string, error)
BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error) BulkSelectRoomIDs(ctx context.Context, roomNIDs []types.RoomNID) ([]string, error)