[bugfix] Fix temp table deletion causing runaway allocations (#3278)

* [bugfix] Fix temp table deletion causing runaway allocations

* move some vars around

* small fixes

* rely on conn max age to recycle temp tables

* fackin' ell m8
This commit is contained in:
tobi 2024-09-08 16:14:56 +02:00 committed by GitHub
parent edbcf0fa6d
commit b17010cf17
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 154 additions and 83 deletions

View file

@ -352,7 +352,7 @@ func sqliteConn(ctx context.Context) (*bun.DB, error) {
} }
// Build SQLite connection address with prefs. // Build SQLite connection address with prefs.
address = buildSQLiteAddress(address) address, inMem := buildSQLiteAddress(address)
// Open new DB instance // Open new DB instance
sqldb, err := sql.Open("sqlite-gts", address) sqldb, err := sql.Open("sqlite-gts", address)
@ -365,7 +365,13 @@ func sqliteConn(ctx context.Context) (*bun.DB, error) {
// - https://www.alexedwards.net/blog/configuring-sqldb // - https://www.alexedwards.net/blog/configuring-sqldb
sqldb.SetMaxOpenConns(maxOpenConns()) // x number of conns per CPU sqldb.SetMaxOpenConns(maxOpenConns()) // x number of conns per CPU
sqldb.SetMaxIdleConns(1) // only keep max 1 idle connection around sqldb.SetMaxIdleConns(1) // only keep max 1 idle connection around
sqldb.SetConnMaxLifetime(0) // don't kill connections due to age if inMem {
log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests")
// Don't close aged connections as this may wipe the DB.
sqldb.SetConnMaxLifetime(0)
} else {
sqldb.SetConnMaxLifetime(5 * time.Minute)
}
db := bun.NewDB(sqldb, sqlitedialect.New()) db := bun.NewDB(sqldb, sqlitedialect.New())
@ -485,7 +491,8 @@ func deriveBunDBPGOptions() (*pgx.ConnConfig, error) {
// buildSQLiteAddress will build an SQLite address string from given config input, // buildSQLiteAddress will build an SQLite address string from given config input,
// appending user defined SQLite connection preferences (e.g. cache_size, journal_mode etc). // appending user defined SQLite connection preferences (e.g. cache_size, journal_mode etc).
func buildSQLiteAddress(addr string) string { // The returned bool indicates whether this is an in-memory address or not.
func buildSQLiteAddress(addr string) (string, bool) {
// Notes on SQLite preferences: // Notes on SQLite preferences:
// //
// - SQLite by itself supports setting a subset of its configuration options // - SQLite by itself supports setting a subset of its configuration options
@ -543,11 +550,11 @@ func buildSQLiteAddress(addr string) string {
// see https://pkg.go.dev/modernc.org/sqlite#Driver.Open // see https://pkg.go.dev/modernc.org/sqlite#Driver.Open
prefs.Add("_txlock", "immediate") prefs.Add("_txlock", "immediate")
inMem := false
if addr == ":memory:" { if addr == ":memory:" {
log.Warn(nil, "using sqlite in-memory mode; all data will be deleted when gts shuts down; this mode should only be used for debugging or running tests")
// Use random name for in-memory instead of ':memory:', so // Use random name for in-memory instead of ':memory:', so
// multiple in-mem databases can be created without conflict. // multiple in-mem databases can be created without conflict.
inMem = true
addr = "/" + uuid.NewString() addr = "/" + uuid.NewString()
prefs.Add("vfs", "memdb") prefs.Add("vfs", "memdb")
} }
@ -581,5 +588,5 @@ func buildSQLiteAddress(addr string) string {
b.WriteString(addr) b.WriteString(addr)
b.WriteString("?") b.WriteString("?")
b.WriteString(prefs.Encode()) b.WriteString(prefs.Encode())
return b.String() return b.String(), inMem
} }

View file

@ -21,6 +21,7 @@ import (
"context" "context"
"errors" "errors"
"slices" "slices"
"time"
"github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
@ -334,39 +335,81 @@ func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context
} }
func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error { func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error {
// SQL returning the current time. var (
var nowSQL string updatedConversationIDs = []string{}
switch c.db.Dialect().Name() { deletedConversationIDs = []string{}
case dialect.SQLite:
nowSQL = "DATE('now')"
case dialect.PG:
nowSQL = "NOW()"
default:
log.Panicf(nil, "db conn %s was neither pg nor sqlite", c.db)
}
updatedConversationIDs := []string{} // Method of creating + dropping temp
deletedConversationIDs := []string{} // tables differs depending on driver.
tmpQ string
)
if c.db.Dialect().Name() == dialect.PG {
// On Postgres, we can instruct PG to clean
// up temp tables on commit, so we can just
// use any connection from the pool without
// caring what happens to it when we're done.
tmpQ = "CREATE TEMPORARY TABLE ? ON COMMIT DROP AS (?)"
} else {
// On SQLite, we can't instruct SQLite to drop
// temp tables on commit, and we can't manually
// drop temp tables without triggering a bug.
// So we leave the temp tables alone, in the
// knowledge they'll be cleaned up when this
// connection gets recycled (in max 5min).
tmpQ = "CREATE TEMPORARY TABLE ? AS ?"
}
if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
// Delete this status from conversation-to-status links. // First delete this status from
if _, err := tx.NewDelete(). // conversation-to-status links.
Model((*gtsmodel.ConversationToStatus)(nil)). _, err := tx.
NewDelete().
Table("conversation_to_statuses").
Where("? = ?", bun.Ident("status_id"), statusID). Where("? = ?", bun.Ident("status_id"), statusID).
Exec(ctx); // nocollapse Exec(ctx)
err != nil { if err != nil {
return gtserror.Newf("error deleting conversation-to-status links while deleting status %s: %w", statusID, err) return gtserror.Newf(
"error deleting conversation-to-status links while deleting status %s: %w",
statusID, err,
)
} }
// Note: Bun doesn't currently support CREATE TABLE … AS SELECT … so we need to use raw queries here. // Note: Bun doesn't currently support `CREATE TABLE … AS SELECT …`
// so we need to use raw queries to create temporary tables.
// Create a temporary table with all statuses other than the deleted status // Create a temporary table containing all statuses other than
// in each conversation for which the deleted status is the last status // the deleted status, in each conversation for which the deleted
// (if there are such statuses). // status is the last status, if there are such statuses.
conversationStatusesTempTable := "conversation_statuses_" + id.NewULID() //
if _, err := tx.NewRaw( // This will produce a query like:
"CREATE TEMPORARY TABLE ? AS ?", //
bun.Ident(conversationStatusesTempTable), // CREATE TEMPORARY TABLE "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S"
// AS (
// SELECT
// "conversations"."id" AS "conversation_id",
// "conversation_to_statuses"."status_id" AS "id",
// "statuses"."created_at"
// FROM
// "conversations"
// LEFT JOIN "conversation_to_statuses" ON (
// "conversations"."id" = "conversation_to_statuses"."conversation_id"
// )
// AND (
// "conversation_to_statuses"."status_id" != '01J78T2BQ4TN5S2XSC9VNQ5GBS'
// )
// LEFT JOIN "statuses" ON (
// "conversation_to_statuses"."status_id" = "statuses"."id"
// )
// WHERE
// (
// "conversations"."last_status_id" = '01J78T2BQ4TN5S2XSC9VNQ5GBS'
// )
// )
conversationStatusesTmp := "conversation_statuses_" + id.NewULID()
conversationStatusesTmpQ := tx.NewRaw(
tmpQ,
bun.Ident(conversationStatusesTmp),
tx.NewSelect(). tx.NewSelect().
ColumnExpr( ColumnExpr(
"? AS ?", "? AS ?",
@ -402,18 +445,41 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat
bun.Ident("conversations.last_status_id"), bun.Ident("conversations.last_status_id"),
statusID, statusID,
), ),
). )
Exec(ctx); // nocollapse _, err = conversationStatusesTmpQ.Exec(ctx)
err != nil { if err != nil {
return gtserror.Newf("error creating conversationStatusesTempTable while deleting status %s: %w", statusID, err) return gtserror.Newf(
"error creating temp table %s while deleting status %s: %w",
conversationStatusesTmp, statusID, err,
)
} }
// Create a temporary table with the most recently created status in each conversation // Create a temporary table with the most recently created
// for which the deleted status is the last status (if there is such a status). // status in each conversation for which the deleted status
latestConversationStatusesTempTable := "latest_conversation_statuses_" + id.NewULID() // is the last status, if there is such a status.
if _, err := tx.NewRaw( //
"CREATE TEMPORARY TABLE ? AS ?", // This will produce a query like:
bun.Ident(latestConversationStatusesTempTable), //
// CREATE TEMPORARY TABLE "latest_conversation_statuses_01J78T2AR0E46SJSH6C7NRZ7MR"
// AS (
// SELECT
// "conversation_statuses"."conversation_id",
// "conversation_statuses"."id"
// FROM
// "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "conversation_statuses"
// LEFT JOIN "conversation_statuses_01J78T2AR0YCZ4YR12WSCZ608S" AS "later_statuses" ON (
// "conversation_statuses"."conversation_id" = "later_statuses"."conversation_id"
// )
// AND (
// "later_statuses"."created_at" > "conversation_statuses"."created_at"
// )
// WHERE
// ("later_statuses"."id" IS NULL)
// )
latestConversationStatusesTmp := "latest_conversation_statuses_" + id.NewULID()
latestConversationStatusesTmpQ := tx.NewRaw(
tmpQ,
bun.Ident(latestConversationStatusesTmp),
tx.NewSelect(). tx.NewSelect().
Column( Column(
"conversation_statuses.conversation_id", "conversation_statuses.conversation_id",
@ -421,12 +487,12 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat
). ).
TableExpr( TableExpr(
"? AS ?", "? AS ?",
bun.Ident(conversationStatusesTempTable), bun.Ident(conversationStatusesTmp),
bun.Ident("conversation_statuses"), bun.Ident("conversation_statuses"),
). ).
Join( Join(
"LEFT JOIN ? AS ?", "LEFT JOIN ? AS ?",
bun.Ident(conversationStatusesTempTable), bun.Ident(conversationStatusesTmp),
bun.Ident("later_statuses"), bun.Ident("later_statuses"),
). ).
JoinOn( JoinOn(
@ -440,68 +506,66 @@ func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, stat
bun.Ident("conversation_statuses.created_at"), bun.Ident("conversation_statuses.created_at"),
). ).
Where("? IS NULL", bun.Ident("later_statuses.id")), Where("? IS NULL", bun.Ident("later_statuses.id")),
). )
Exec(ctx); // nocollapse _, err = latestConversationStatusesTmpQ.Exec(ctx)
err != nil { if err != nil {
return gtserror.Newf("error creating latestConversationStatusesTempTable while deleting status %s: %w", statusID, err) return gtserror.Newf(
"error creating temp table %s while deleting status %s: %w",
conversationStatusesTmp, statusID, err,
)
} }
// For every conversation where the given status was the last one, // For every conversation where the given status was the last one,
// reset its last status to the most recently created in the conversation other than that one, // reset its last status to the most recently created in the
// if there is such a status. // conversation other than that one, if there is such a status.
// Return conversation IDs for invalidation. // Return conversation IDs for invalidation.
if err := tx.NewUpdate(). updateQ := tx.NewUpdate().
Model((*gtsmodel.Conversation)(nil)). Table("conversations").
SetColumn("last_status_id", "?", bun.Ident("latest_conversation_statuses.id")). TableExpr("? AS ?", bun.Ident(latestConversationStatusesTmp), bun.Ident("latest_conversation_statuses")).
SetColumn("updated_at", "?", bun.Safe(nowSQL)). Set("? = ?", bun.Ident("last_status_id"), bun.Ident("latest_conversation_statuses.id")).
TableExpr("? AS ?", bun.Ident(latestConversationStatusesTempTable), bun.Ident("latest_conversation_statuses")). Set("? = ?", bun.Ident("updated_at"), time.Now()).
Where("?TableAlias.? = ?", bun.Ident("id"), bun.Ident("latest_conversation_statuses.conversation_id")). Where("? = ?", bun.Ident("conversations.id"), bun.Ident("latest_conversation_statuses.conversation_id")).
Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")). Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")).
Returning("?TableName.?", bun.Ident("id")). Returning("?", bun.Ident("conversations.id"))
Scan(ctx, &updatedConversationIDs); // nocollapse _, err = updateQ.Exec(ctx, &updatedConversationIDs)
err != nil { if err != nil {
return gtserror.Newf("error rolling back last status for conversation while deleting status %s: %w", statusID, err) return gtserror.Newf(
"error rolling back last status for conversation while deleting status %s: %w",
statusID, err,
)
} }
// If there is no such status, delete the conversation. // If there is no such status,
// Return conversation IDs for invalidation. // just delete the conversation.
if err := tx.NewDelete(). // Return IDs for invalidation.
Model((*gtsmodel.Conversation)(nil)). _, err = tx.
NewDelete().
Table("conversations").
Where( Where(
"? IN (?)", "? IN (?)",
bun.Ident("id"), bun.Ident("id"),
tx.NewSelect(). tx.NewSelect().
Table(latestConversationStatusesTempTable). Table(latestConversationStatusesTmp).
Column("conversation_id"). Column("conversation_id").
Where("? IS NULL", bun.Ident("id")), Where("? IS NULL", bun.Ident("id")),
). ).
Returning("?", bun.Ident("id")). Returning("?", bun.Ident("id")).
Scan(ctx, &deletedConversationIDs); // nocollapse Exec(ctx, &deletedConversationIDs)
err != nil { if err != nil {
return gtserror.Newf("error deleting conversation while deleting status %s: %w", statusID, err)
}
// Clean up.
for _, tempTable := range []string{
conversationStatusesTempTable,
latestConversationStatusesTempTable,
} {
if _, err := tx.NewDropTable().Table(tempTable).Exec(ctx); err != nil {
return gtserror.Newf( return gtserror.Newf(
"error dropping temporary table %s after deleting status %s: %w", "error deleting conversation while deleting status %s: %w",
tempTable, statusID, err,
statusID,
err,
) )
} }
}
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
// Invalidate cache entries.
updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...) updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...)
updatedConversationIDs = util.Deduplicate(updatedConversationIDs)
c.state.Caches.DB.Conversation.InvalidateIDs("ID", updatedConversationIDs) c.state.Caches.DB.Conversation.InvalidateIDs("ID", updatedConversationIDs)
return nil return nil