mirror of
https://github.com/matrix-org/dendrite
synced 2025-01-05 17:58:42 +00:00
Roomserver database-wide TransactionWriters (#1282)
* Database-wide TransactionWriter * Fix deadlocking Sync API tests * Undo non-roomserver changes for now
This commit is contained in:
parent
e571e196ce
commit
3d58417555
15 changed files with 44 additions and 43 deletions
|
@ -54,10 +54,10 @@ type eventJSONStatements struct {
|
||||||
bulkSelectEventJSONStmt *sql.Stmt
|
bulkSelectEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
func NewSqliteEventJSONTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.EventJSON, error) {
|
||||||
s := &eventJSONStatements{
|
s := &eventJSONStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventJSONSchema)
|
_, err := db.Exec(eventJSONSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -71,10 +71,10 @@ type eventStateKeyStatements struct {
|
||||||
bulkSelectEventStateKeyStmt *sql.Stmt
|
bulkSelectEventStateKeyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
func NewSqliteEventStateKeysTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.EventStateKeys, error) {
|
||||||
s := &eventStateKeyStatements{
|
s := &eventStateKeyStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventStateKeysSchema)
|
_, err := db.Exec(eventStateKeysSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -85,10 +85,10 @@ type eventTypeStatements struct {
|
||||||
bulkSelectEventTypeNIDStmt *sql.Stmt
|
bulkSelectEventTypeNIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
func NewSqliteEventTypesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.EventTypes, error) {
|
||||||
s := &eventTypeStatements{
|
s := &eventTypeStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventTypesSchema)
|
_, err := db.Exec(eventTypesSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -115,10 +115,10 @@ type eventStatements struct {
|
||||||
selectRoomNIDForEventNIDStmt *sql.Stmt
|
selectRoomNIDForEventNIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
|
func NewSqliteEventsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Events, error) {
|
||||||
s := &eventStatements{
|
s := &eventStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(eventsSchema)
|
_, err := db.Exec(eventsSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -71,10 +71,10 @@ type inviteStatements struct {
|
||||||
selectInvitesAboutToRetireStmt *sql.Stmt
|
selectInvitesAboutToRetireStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteInvitesTable(db *sql.DB) (tables.Invites, error) {
|
func NewSqliteInvitesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Invites, error) {
|
||||||
s := &inviteStatements{
|
s := &inviteStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(inviteSchema)
|
_, err := db.Exec(inviteSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -124,7 +124,7 @@ func (s *inviteStatements) UpdateInviteRetired(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer (func() { err = rows.Close() })()
|
defer internal.CloseAndLogIfError(ctx, rows, "UpdateInviteRetired: rows.close() failed")
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var inviteEventID string
|
var inviteEventID string
|
||||||
if err = rows.Scan(&inviteEventID); err != nil {
|
if err = rows.Scan(&inviteEventID); err != nil {
|
||||||
|
|
|
@ -88,10 +88,10 @@ type membershipStatements struct {
|
||||||
updateMembershipStmt *sql.Stmt
|
updateMembershipStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteMembershipTable(db *sql.DB) (tables.Membership, error) {
|
func NewSqliteMembershipTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Membership, error) {
|
||||||
s := &membershipStatements{
|
s := &membershipStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(membershipSchema)
|
_, err := db.Exec(membershipSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -59,10 +59,10 @@ type previousEventStatements struct {
|
||||||
selectPreviousEventExistsStmt *sql.Stmt
|
selectPreviousEventExistsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqlitePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
func NewSqlitePrevEventsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.PreviousEvents, error) {
|
||||||
s := &previousEventStatements{
|
s := &previousEventStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(previousEventSchema)
|
_, err := db.Exec(previousEventSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -51,10 +51,10 @@ type publishedStatements struct {
|
||||||
selectPublishedStmt *sql.Stmt
|
selectPublishedStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqlitePublishedTable(db *sql.DB) (tables.Published, error) {
|
func NewSqlitePublishedTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Published, error) {
|
||||||
s := &publishedStatements{
|
s := &publishedStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(publishedSchema)
|
_, err := db.Exec(publishedSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -60,10 +60,10 @@ type redactionStatements struct {
|
||||||
markRedactionValidatedStmt *sql.Stmt
|
markRedactionValidatedStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteRedactionsTable(db *sql.DB) (tables.Redactions, error) {
|
func NewSqliteRedactionsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Redactions, error) {
|
||||||
s := &redactionStatements{
|
s := &redactionStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(redactionsSchema)
|
_, err := db.Exec(redactionsSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -65,10 +65,10 @@ type roomAliasesStatements struct {
|
||||||
deleteRoomAliasStmt *sql.Stmt
|
deleteRoomAliasStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
|
func NewSqliteRoomAliasesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.RoomAliases, error) {
|
||||||
s := &roomAliasesStatements{
|
s := &roomAliasesStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(roomAliasesSchema)
|
_, err := db.Exec(roomAliasesSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -76,10 +76,10 @@ type roomStatements struct {
|
||||||
selectRoomVersionForRoomNIDStmt *sql.Stmt
|
selectRoomVersionForRoomNIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) {
|
func NewSqliteRoomsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Rooms, error) {
|
||||||
s := &roomStatements{
|
s := &roomStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(roomsSchema)
|
_, err := db.Exec(roomsSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -81,10 +81,10 @@ type stateBlockStatements struct {
|
||||||
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
|
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
|
func NewSqliteStateBlockTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.StateBlock, error) {
|
||||||
s := &stateBlockStatements{
|
s := &stateBlockStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(stateDataSchema)
|
_, err := db.Exec(stateDataSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -55,10 +55,10 @@ type stateSnapshotStatements struct {
|
||||||
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
func NewSqliteStateSnapshotTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.StateSnapshot, error) {
|
||||||
s := &stateSnapshotStatements{
|
s := &stateSnapshotStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(stateSnapshotSchema)
|
_, err := db.Exec(stateSnapshotSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -51,6 +51,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
writer := sqlutil.NewTransactionWriter()
|
||||||
//d.db.Exec("PRAGMA journal_mode=WAL;")
|
//d.db.Exec("PRAGMA journal_mode=WAL;")
|
||||||
//d.db.Exec("PRAGMA read_uncommitted = true;")
|
//d.db.Exec("PRAGMA read_uncommitted = true;")
|
||||||
|
|
||||||
|
@ -60,59 +61,59 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
// which it will never obtain.
|
// which it will never obtain.
|
||||||
d.db.SetMaxOpenConns(20)
|
d.db.SetMaxOpenConns(20)
|
||||||
|
|
||||||
d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db)
|
d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.eventTypes, err = NewSqliteEventTypesTable(d.db)
|
d.eventTypes, err = NewSqliteEventTypesTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.eventJSON, err = NewSqliteEventJSONTable(d.db)
|
d.eventJSON, err = NewSqliteEventJSONTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.events, err = NewSqliteEventsTable(d.db)
|
d.events, err = NewSqliteEventsTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.rooms, err = NewSqliteRoomsTable(d.db)
|
d.rooms, err = NewSqliteRoomsTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.transactions, err = NewSqliteTransactionsTable(d.db)
|
d.transactions, err = NewSqliteTransactionsTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stateBlock, err := NewSqliteStateBlockTable(d.db)
|
stateBlock, err := NewSqliteStateBlockTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stateSnapshot, err := NewSqliteStateSnapshotTable(d.db)
|
stateSnapshot, err := NewSqliteStateSnapshotTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.prevEvents, err = NewSqlitePrevEventsTable(d.db)
|
d.prevEvents, err = NewSqlitePrevEventsTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
roomAliases, err := NewSqliteRoomAliasesTable(d.db)
|
roomAliases, err := NewSqliteRoomAliasesTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.invites, err = NewSqliteInvitesTable(d.db)
|
d.invites, err = NewSqliteInvitesTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.membership, err = NewSqliteMembershipTable(d.db)
|
d.membership, err = NewSqliteMembershipTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
published, err := NewSqlitePublishedTable(d.db)
|
published, err := NewSqlitePublishedTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
redactions, err := NewSqliteRedactionsTable(d.db)
|
redactions, err := NewSqliteRedactionsTable(d.db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,10 +50,10 @@ type transactionStatements struct {
|
||||||
selectTransactionEventIDStmt *sql.Stmt
|
selectTransactionEventIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteTransactionsTable(db *sql.DB) (tables.Transactions, error) {
|
func NewSqliteTransactionsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Transactions, error) {
|
||||||
s := &transactionStatements{
|
s := &transactionStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(transactionsSchema)
|
_, err := db.Exec(transactionsSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue