diff --git a/federationsender/storage/postgres/deltas/2021020411080000_rooms.go b/federationsender/storage/postgres/deltas/2021020411080000_rooms.go new file mode 100644 index 000000000..cc4bdadfd --- /dev/null +++ b/federationsender/storage/postgres/deltas/2021020411080000_rooms.go @@ -0,0 +1,46 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func LoadRemoveRoomsTable(m *sqlutil.Migrations) { + m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func UpRemoveRoomsTable(tx *sql.Tx) error { + _, err := tx.Exec(` + DROP TABLE IF EXISTS federationsender_rooms; + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownRemoveRoomsTable(tx *sql.Tx) error { + // We can't reverse this. + return nil +} diff --git a/federationsender/storage/postgres/room_table.go b/federationsender/storage/postgres/room_table.go deleted file mode 100644 index 8d3ed20ff..000000000 --- a/federationsender/storage/postgres/room_table.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal/sqlutil" -) - -const roomSchema = ` -CREATE TABLE IF NOT EXISTS federationsender_rooms ( - -- The string ID of the room - room_id TEXT PRIMARY KEY, - -- The most recent event state by the room server. - -- We can use this to tell if our view of the room state has become - -- desynchronised. - last_event_id TEXT NOT NULL -);` - -const insertRoomSQL = "" + - "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" + - " ON CONFLICT DO NOTHING" - -const selectRoomForUpdateSQL = "" + - "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1 FOR UPDATE" - -const updateRoomSQL = "" + - "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1" - -type roomStatements struct { - db *sql.DB - insertRoomStmt *sql.Stmt - selectRoomForUpdateStmt *sql.Stmt - updateRoomStmt *sql.Stmt -} - -func NewPostgresRoomsTable(db *sql.DB) (s *roomStatements, err error) { - s = &roomStatements{ - db: db, - } - _, err = s.db.Exec(roomSchema) - if err != nil { - return - } - if s.insertRoomStmt, err = s.db.Prepare(insertRoomSQL); err != nil { - return - } - if s.selectRoomForUpdateStmt, err = s.db.Prepare(selectRoomForUpdateSQL); err != nil { - return - } - if s.updateRoomStmt, err = s.db.Prepare(updateRoomSQL); err != nil { - return - } - return -} - -// insertRoom inserts the room if it didn't already exist. -// If the room didn't exist then last_event_id is set to the empty string. -func (s *roomStatements) InsertRoom( - ctx context.Context, txn *sql.Tx, roomID string, -) error { - _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) - return err -} - -// selectRoomForUpdate locks the row for the room and returns the last_event_id. -// The row must already exist in the table. Callers can ensure that the row -// exists by calling insertRoom first. -func (s *roomStatements) SelectRoomForUpdate( - ctx context.Context, txn *sql.Tx, roomID string, -) (string, error) { - var lastEventID string - stmt := sqlutil.TxStmt(txn, s.selectRoomForUpdateStmt) - err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID) - if err != nil { - return "", err - } - return lastEventID, nil -} - -// updateRoom updates the last_event_id for the room. selectRoomForUpdate should -// have already been called earlier within the transaction. -func (s *roomStatements) UpdateRoom( - ctx context.Context, txn *sql.Tx, roomID, lastEventID string, -) error { - stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) - _, err := stmt.ExecContext(ctx, roomID, lastEventID) - return err -} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index b9827ca19..5edc08ad7 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -18,6 +18,7 @@ package postgres import ( "database/sql" + "github.com/matrix-org/dendrite/federationsender/storage/postgres/deltas" "github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -56,10 +57,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } - rooms, err := NewPostgresRoomsTable(d.db) - if err != nil { - return nil, err - } blacklist, err := NewPostgresBlacklistTable(d.db) if err != nil { return nil, err @@ -72,6 +69,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + m := sqlutil.NewMigrations() + deltas.LoadRemoveRoomsTable(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Cache: cache, @@ -80,7 +82,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderQueuePDUs: queuePDUs, FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueJSON: queueJSON, - FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, FederationSenderInboundPeeks: inboundPeeks, FederationSenderOutboundPeeks: outboundPeeks, diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 4c9490424..2e74e9d6a 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -34,7 +34,6 @@ type Database struct { FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueJSON tables.FederationSenderQueueJSON FederationSenderJoinedHosts tables.FederationSenderJoinedHosts - FederationSenderRooms tables.FederationSenderRooms FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks FederationSenderInboundPeeks tables.FederationSenderInboundPeeks @@ -64,29 +63,6 @@ func (d *Database) UpdateRoom( removeHosts []string, ) (joinedHosts []types.JoinedHost, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID) - if err != nil { - return err - } - - lastSentEventID, err := d.FederationSenderRooms.SelectRoomForUpdate(ctx, txn, roomID) - if err != nil { - return err - } - - if lastSentEventID == newEventID { - // We've handled this message before, so let's just ignore it. - // We can only get a duplicate for the last message we processed, - // so its enough just to compare the newEventID with lastSentEventID - return nil - } - - if lastSentEventID != "" && lastSentEventID != oldEventID { - return types.EventIDMismatchError{ - DatabaseID: lastSentEventID, RoomServerID: oldEventID, - } - } - joinedHosts, err = d.FederationSenderJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err @@ -101,7 +77,7 @@ func (d *Database) UpdateRoom( if err = d.FederationSenderJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil { return err } - return d.FederationSenderRooms.UpdateRoom(ctx, txn, roomID, newEventID) + return nil }) return } diff --git a/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go b/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go new file mode 100644 index 000000000..cc4bdadfd --- /dev/null +++ b/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go @@ -0,0 +1,46 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func LoadRemoveRoomsTable(m *sqlutil.Migrations) { + m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func UpRemoveRoomsTable(tx *sql.Tx) error { + _, err := tx.Exec(` + DROP TABLE IF EXISTS federationsender_rooms; + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownRemoveRoomsTable(tx *sql.Tx) error { + // We can't reverse this. + return nil +} diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go deleted file mode 100644 index 0710ccca3..000000000 --- a/federationsender/storage/sqlite3/room_table.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal/sqlutil" -) - -const roomSchema = ` -CREATE TABLE IF NOT EXISTS federationsender_rooms ( - -- The string ID of the room - room_id TEXT PRIMARY KEY, - -- The most recent event state by the room server. - -- We can use this to tell if our view of the room state has become - -- desynchronised. - last_event_id TEXT NOT NULL -);` - -const insertRoomSQL = "" + - "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" + - " ON CONFLICT DO NOTHING" - -const selectRoomForUpdateSQL = "" + - "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1" - -const updateRoomSQL = "" + - "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1" - -type roomStatements struct { - db *sql.DB - insertRoomStmt *sql.Stmt - selectRoomForUpdateStmt *sql.Stmt - updateRoomStmt *sql.Stmt -} - -func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) { - s = &roomStatements{ - db: db, - } - _, err = db.Exec(roomSchema) - if err != nil { - return - } - - if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil { - return - } - if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil { - return - } - if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil { - return - } - return -} - -// insertRoom inserts the room if it didn't already exist. -// If the room didn't exist then last_event_id is set to the empty string. -func (s *roomStatements) InsertRoom( - ctx context.Context, txn *sql.Tx, roomID string, -) error { - _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) - return err -} - -// selectRoomForUpdate locks the row for the room and returns the last_event_id. -// The row must already exist in the table. Callers can ensure that the row -// exists by calling insertRoom first. -func (s *roomStatements) SelectRoomForUpdate( - ctx context.Context, txn *sql.Tx, roomID string, -) (string, error) { - var lastEventID string - stmt := sqlutil.TxStmt(txn, s.selectRoomForUpdateStmt) - err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID) - if err != nil { - return "", err - } - return lastEventID, nil -} - -// updateRoom updates the last_event_id for the room. selectRoomForUpdate should -// have already been called earlier within the transaction. -func (s *roomStatements) UpdateRoom( - ctx context.Context, txn *sql.Tx, roomID, lastEventID string, -) error { - stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) - _, err := stmt.ExecContext(ctx, roomID, lastEventID) - return err -} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 2b1358587..84a9ff860 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -21,6 +21,7 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationsender/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" @@ -46,10 +47,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } - rooms, err := NewSQLiteRoomsTable(d.db) - if err != nil { - return nil, err - } queuePDUs, err := NewSQLiteQueuePDUsTable(d.db) if err != nil { return nil, err @@ -74,6 +71,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + m := sqlutil.NewMigrations() + deltas.LoadRemoveRoomsTable(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Cache: cache, @@ -82,7 +84,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderQueuePDUs: queuePDUs, FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueJSON: queueJSON, - FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, FederationSenderOutboundPeeks: outboundPeeks, FederationSenderInboundPeeks: inboundPeeks, diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 22fd5554f..34ff0b97e 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -56,12 +56,6 @@ type FederationSenderJoinedHosts interface { SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) } -type FederationSenderRooms interface { - InsertRoom(ctx context.Context, txn *sql.Tx, roomID string) error - SelectRoomForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (string, error) - UpdateRoom(ctx context.Context, txn *sql.Tx, roomID, lastEventID string) error -} - type FederationSenderBlacklist interface { InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 90da310c9..c486c05c4 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -15,8 +15,6 @@ package types import ( - "fmt" - "github.com/matrix-org/gomatrixserverlib" ) @@ -34,22 +32,6 @@ func (s ServerNames) Len() int { return len(s) } func (s ServerNames) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s ServerNames) Less(i, j int) bool { return s[i] < s[j] } -// A EventIDMismatchError indicates that we have got out of sync with the -// room server. -type EventIDMismatchError struct { - // The event ID we have stored in our local database. - DatabaseID string - // The event ID received from the room server. - RoomServerID string -} - -func (e EventIDMismatchError) Error() string { - return fmt.Sprintf( - "mismatched last sent event ID: had %q in database got %q from room server", - e.DatabaseID, e.RoomServerID, - ) -} - // tracks peeks we're performing on another server over federation type OutboundPeek struct { PeekID string