Clean old notifications regularly (#2244)

* Clean old notifications regularly

We'll keep highlights for a month and non-highlights for a day, to stop the `userapi_notifications` table from growing indefinitely.

We'll also allow storing events even if no pushers are present, because apparently Element Web expects to work that way.

* Fix the milliseconds

* Use process context

* Update sytest lists

* Fix build issue
This commit is contained in:
Neil Alexander 2022-03-03 16:45:06 +00:00 committed by GitHub
parent c44029f269
commit 5592322e13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 63 additions and 16 deletions

View file

@ -35,3 +35,4 @@ AS-ghosted users can use rooms themselves
# Flakey, need additional investigation # Flakey, need additional investigation
Messages that notify from another user increment notification_count Messages that notify from another user increment notification_count
Messages that highlight from another user increment unread highlight count Messages that highlight from another user increment unread highlight count
Notifications can be viewed with GET /notifications

View file

@ -638,7 +638,6 @@ Rooms with many users are correctly pushed
Don't get pushed for rooms you've muted Don't get pushed for rooms you've muted
Rejected events are not pushed Rejected events are not pushed
Test that rejected pushers are removed. Test that rejected pushers are removed.
Notifications can be viewed with GET /notifications
Trying to add push rule with no scope fails with 400 Trying to add push rule with no scope fails with 400
Trying to add push rule with invalid scope fails with 400 Trying to add push rule with invalid scope fails with 400
Forward extremities remain so even after the next events are populated as outliers Forward extremities remain so even after the next events are populated as outliers

View file

@ -139,9 +139,6 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g
// removing it means we can send all notifications to // removing it means we can send all notifications to
// e.g. Element's Push gateway in one go. // e.g. Element's Push gateway in one go.
for _, mem := range members { for _, mem := range members {
if p, err := s.db.GetPushers(ctx, mem.Localpart); err != nil || len(p) == 0 {
continue
}
if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil { if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"localpart": mem.Localpart, "localpart": mem.Localpart,

View file

@ -97,6 +97,7 @@ type Database interface {
GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error) GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error)
GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error) GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error)
GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error) GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error)
DeleteOldNotifications(ctx context.Context) error
UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error UpsertPusher(ctx context.Context, p api.Pusher, localpart string) error
GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error)

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"time"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -28,12 +29,13 @@ import (
) )
type notificationsStatements struct { type notificationsStatements struct {
insertStmt *sql.Stmt insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt updateReadStmt *sql.Stmt
selectStmt *sql.Stmt selectStmt *sql.Stmt
selectCountStmt *sql.Stmt selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt selectRoomCountsStmt *sql.Stmt
cleanNotificationsStmt *sql.Stmt
} }
const notificationSchema = ` const notificationSchema = `
@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read" "WHERE localpart = $1 AND room_id = $2 AND NOT read"
const cleanNotificationsSQL = "" +
"DELETE FROM userapi_notifications WHERE" +
" (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) { func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{} s := &notificationsStatements{}
_, err := db.Exec(notificationSchema) _, err := db.Exec(notificationSchema)
@ -90,9 +96,19 @@ func NewPostgresNotificationTable(db *sql.DB) (tables.NotificationTable, error)
{&s.selectStmt, selectNotificationSQL}, {&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
{&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db) }.Prepare(db)
} }
func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
_, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
ctx,
time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
)
return err
}
// Insert inserts a notification into the database. // Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS roomID, tsMS := n.RoomID, n.TS

View file

@ -705,6 +705,10 @@ func (d *Database) GetRoomNotificationCounts(ctx context.Context, localpart, roo
return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID) return d.Notifications.SelectRoomCounts(ctx, nil, localpart, roomID)
} }
func (d *Database) DeleteOldNotifications(ctx context.Context) error {
return d.Notifications.Clean(ctx, nil)
}
func (d *Database) UpsertPusher( func (d *Database) UpsertPusher(
ctx context.Context, p api.Pusher, localpart string, ctx context.Context, p api.Pusher, localpart string,
) error { ) error {

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"time"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -28,12 +29,13 @@ import (
) )
type notificationsStatements struct { type notificationsStatements struct {
insertStmt *sql.Stmt insertStmt *sql.Stmt
deleteUpToStmt *sql.Stmt deleteUpToStmt *sql.Stmt
updateReadStmt *sql.Stmt updateReadStmt *sql.Stmt
selectStmt *sql.Stmt selectStmt *sql.Stmt
selectCountStmt *sql.Stmt selectCountStmt *sql.Stmt
selectRoomCountsStmt *sql.Stmt selectRoomCountsStmt *sql.Stmt
cleanNotificationsStmt *sql.Stmt
} }
const notificationSchema = ` const notificationSchema = `
@ -77,6 +79,10 @@ const selectRoomNotificationCountsSQL = "" +
"SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " + "SELECT COUNT(*), COUNT(*) FILTER (WHERE highlight) FROM userapi_notifications " +
"WHERE localpart = $1 AND room_id = $2 AND NOT read" "WHERE localpart = $1 AND room_id = $2 AND NOT read"
const cleanNotificationsSQL = "" +
"DELETE FROM userapi_notifications WHERE" +
" (highlight = FALSE AND ts_ms < $1) OR (highlight = TRUE AND ts_ms < $2)"
func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) { func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
s := &notificationsStatements{} s := &notificationsStatements{}
_, err := db.Exec(notificationSchema) _, err := db.Exec(notificationSchema)
@ -90,9 +96,19 @@ func NewSQLiteNotificationTable(db *sql.DB) (tables.NotificationTable, error) {
{&s.selectStmt, selectNotificationSQL}, {&s.selectStmt, selectNotificationSQL},
{&s.selectCountStmt, selectNotificationCountSQL}, {&s.selectCountStmt, selectNotificationCountSQL},
{&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL}, {&s.selectRoomCountsStmt, selectRoomNotificationCountsSQL},
{&s.cleanNotificationsStmt, cleanNotificationsSQL},
}.Prepare(db) }.Prepare(db)
} }
func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error {
_, err := sqlutil.TxStmt(txn, s.cleanNotificationsStmt).ExecContext(
ctx,
time.Now().AddDate(0, 0, -1).UnixNano()/int64(time.Millisecond), // keep non-highlights for a day
time.Now().AddDate(0, -1, 0).UnixNano()/int64(time.Millisecond), // keep highlights for a month
)
return err
}
// Insert inserts a notification into the database. // Insert inserts a notification into the database.
func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error { func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS roomID, tsMS := n.RoomID, n.TS

View file

@ -103,6 +103,7 @@ type PusherTable interface {
} }
type NotificationTable interface { type NotificationTable interface {
Clean(ctx context.Context, txn *sql.Tx) error
Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error
DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error)
UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error)

View file

@ -15,6 +15,8 @@
package userapi package userapi
import ( import (
"time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
@ -79,5 +81,15 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start user API streamed event consumer") logrus.WithError(err).Panic("failed to start user API streamed event consumer")
} }
var cleanOldNotifs func()
cleanOldNotifs = func() {
logrus.Infof("Cleaning old notifications")
if err := db.DeleteOldNotifications(base.Context()); err != nil {
logrus.WithError(err).Error("Failed to clean old notifications")
}
time.AfterFunc(time.Hour, cleanOldNotifs)
}
time.AfterFunc(time.Minute, cleanOldNotifs)
return userAPI return userAPI
} }