mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-14 07:12:53 +00:00
More SQLite (#871)
* SQLite support for appservice * SQLite support for mediaapi * Copyright notices * SQLite for public rooms API (although with some slight differences in behaviour) * Lazy match aliases, add TODOs
This commit is contained in:
parent
409fec2a48
commit
3dabf4d4ed
17 changed files with 1034 additions and 111 deletions
|
@ -34,7 +34,7 @@ import (
|
|||
type OutputRoomEventConsumer struct {
|
||||
roomServerConsumer *common.ContinualConsumer
|
||||
db accounts.Database
|
||||
asDB *storage.Database
|
||||
asDB storage.Database
|
||||
query api.RoomserverQueryAPI
|
||||
alias api.RoomserverAliasAPI
|
||||
serverName string
|
||||
|
@ -47,7 +47,7 @@ func NewOutputRoomEventConsumer(
|
|||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store accounts.Database,
|
||||
appserviceDB *storage.Database,
|
||||
appserviceDB storage.Database,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
workerStates []types.ApplicationServiceWorkerState,
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -12,7 +13,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
111
appservice/storage/postgres/storage.go
Normal file
111
appservice/storage/postgres/storage.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
// Import postgres database driver
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewDatabase opens a new database
|
||||
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||
var result Database
|
||||
var err error
|
||||
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (d *Database) prepare() error {
|
||||
if err := d.events.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.txnID.prepare(d.db)
|
||||
}
|
||||
|
||||
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
|
||||
// for a transaction worker to pull and later send to an application service.
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
event *gomatrixserverlib.Event,
|
||||
) error {
|
||||
return d.events.insertEvent(ctx, appServiceID, event)
|
||||
}
|
||||
|
||||
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
|
||||
// be sent to an application service given its ID.
|
||||
func (d *Database) GetEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
limit int,
|
||||
) (int, int, []gomatrixserverlib.Event, bool, error) {
|
||||
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
||||
}
|
||||
|
||||
// CountEventsWithAppServiceID returns the number of events destined for an
|
||||
// application service given its ID.
|
||||
func (d *Database) CountEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
|
||||
}
|
||||
|
||||
// UpdateTxnIDForEvents takes in an application service ID and a
|
||||
// and stores them in the DB, unless the pair already exists, in
|
||||
// which case it updates them.
|
||||
func (d *Database) UpdateTxnIDForEvents(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
maxID, txnID int,
|
||||
) error {
|
||||
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
|
||||
}
|
||||
|
||||
// RemoveEventsBeforeAndIncludingID removes all events from the database that
|
||||
// are less than or equal to a given maximum ID. IDs here are implemented as a
|
||||
// serial, thus this should always delete events in chronological order.
|
||||
func (d *Database) RemoveEventsBeforeAndIncludingID(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
eventTableID int,
|
||||
) error {
|
||||
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
|
||||
}
|
||||
|
||||
// GetLatestTxnID returns the latest available transaction id
|
||||
func (d *Database) GetLatestTxnID(
|
||||
ctx context.Context,
|
||||
) (int, error) {
|
||||
return d.txnID.selectTxnID(ctx)
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -12,7 +13,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
249
appservice/storage/sqlite3/appservice_events_table.go
Normal file
249
appservice/storage/sqlite3/appservice_events_table.go
Normal file
|
@ -0,0 +1,249 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const appserviceEventsSchema = `
|
||||
-- Stores events to be sent to application services
|
||||
CREATE TABLE IF NOT EXISTS appservice_events (
|
||||
-- An auto-incrementing id unique to each event in the table
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
-- The ID of the application service the event will be sent to
|
||||
as_id TEXT NOT NULL,
|
||||
-- JSON representation of the event
|
||||
event_json TEXT NOT NULL,
|
||||
-- The ID of the transaction that this event is a part of
|
||||
txn_id INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
|
||||
`
|
||||
|
||||
const selectEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT id, event_json, txn_id " +
|
||||
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
||||
|
||||
const countEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
||||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
|
||||
"VALUES ($1, $2, $3)"
|
||||
|
||||
const updateTxnIDForEventsSQL = "" +
|
||||
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
|
||||
|
||||
const deleteEventsBeforeAndIncludingIDSQL = "" +
|
||||
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
|
||||
|
||||
const (
|
||||
// A transaction ID number that no transaction should ever have. Used for
|
||||
// checking again the default value.
|
||||
invalidTxnID = -2
|
||||
)
|
||||
|
||||
type eventsStatements struct {
|
||||
selectEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
countEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
updateTxnIDForEventsStmt *sql.Stmt
|
||||
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(appserviceEventsSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// selectEventsByApplicationServiceID takes in an application service ID and
|
||||
// returns a slice of events that need to be sent to that application service,
|
||||
// as well as an int later used to remove these same events from the database
|
||||
// once successfully sent to an application service.
|
||||
func (s *eventsStatements) selectEventsByApplicationServiceID(
|
||||
ctx context.Context,
|
||||
applicationServiceID string,
|
||||
limit int,
|
||||
) (
|
||||
txnID, maxID int,
|
||||
events []gomatrixserverlib.Event,
|
||||
eventsRemaining bool,
|
||||
err error,
|
||||
) {
|
||||
// Retrieve events from the database. Unsuccessfully sent events first
|
||||
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
err = eventRows.Close()
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"appservice": applicationServiceID,
|
||||
}).WithError(err).Fatalf("appservice unable to select new events to send")
|
||||
}
|
||||
}()
|
||||
events, maxID, txnID, eventsRemaining, err = retrieveEvents(eventRows, limit)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
|
||||
// Get current time for use in calculating event age
|
||||
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
|
||||
// Iterate through each row and store event contents
|
||||
// If txn_id changes dramatically, we've switched from collecting old events to
|
||||
// new ones. Send back those events first.
|
||||
lastTxnID := invalidTxnID
|
||||
for eventsProcessed := 0; eventRows.Next(); {
|
||||
var event gomatrixserverlib.Event
|
||||
var eventJSON []byte
|
||||
var id int
|
||||
err = eventRows.Scan(
|
||||
&id,
|
||||
&eventJSON,
|
||||
&txnID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, 0, false, err
|
||||
}
|
||||
|
||||
// Unmarshal eventJSON
|
||||
if err = json.Unmarshal(eventJSON, &event); err != nil {
|
||||
return nil, 0, 0, false, err
|
||||
}
|
||||
|
||||
// If txnID has changed on this event from the previous event, then we've
|
||||
// reached the end of a transaction's events. Return only those events.
|
||||
if lastTxnID > invalidTxnID && lastTxnID != txnID {
|
||||
return events, maxID, lastTxnID, true, nil
|
||||
}
|
||||
lastTxnID = txnID
|
||||
|
||||
// Limit events that aren't part of an old transaction
|
||||
if txnID == -1 {
|
||||
// Return if we've hit the limit
|
||||
if eventsProcessed++; eventsProcessed > limit {
|
||||
return events, maxID, lastTxnID, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
if id > maxID {
|
||||
maxID = id
|
||||
}
|
||||
|
||||
// Portion of the event that is unsigned due to rapid change
|
||||
// TODO: Consider removing age as not many app services use it
|
||||
if err = event.SetUnsignedField("age", nowMilli-int64(event.OriginServerTS())); err != nil {
|
||||
return nil, 0, 0, false, err
|
||||
}
|
||||
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
|
||||
// IDs into the db.
|
||||
func (s *eventsStatements) countEventsByApplicationServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
var count int
|
||||
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// insertEvent inserts an event mapped to its corresponding application service
|
||||
// IDs into the db.
|
||||
func (s *eventsStatements) insertEvent(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
event *gomatrixserverlib.Event,
|
||||
) (err error) {
|
||||
// Convert event to JSON before inserting
|
||||
eventJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = s.insertEventStmt.ExecContext(
|
||||
ctx,
|
||||
appServiceID,
|
||||
eventJSON,
|
||||
-1, // No transaction ID yet
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
|
||||
// before sending them to an AppService. Referenced before sending to make sure
|
||||
// we aren't constructing multiple transactions with the same events.
|
||||
func (s *eventsStatements) updateTxnIDForEvents(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
maxID, txnID int,
|
||||
) (err error) {
|
||||
_, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
|
||||
return
|
||||
}
|
||||
|
||||
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
|
||||
func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
eventTableID int,
|
||||
) (err error) {
|
||||
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
|
||||
return
|
||||
}
|
111
appservice/storage/sqlite3/storage.go
Normal file
111
appservice/storage/sqlite3/storage.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
// Import SQLite database driver
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewDatabase opens a new database
|
||||
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||
var result Database
|
||||
var err error
|
||||
if result.db, err = sql.Open("sqlite3", dataSourceName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (d *Database) prepare() error {
|
||||
if err := d.events.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.txnID.prepare(d.db)
|
||||
}
|
||||
|
||||
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
|
||||
// for a transaction worker to pull and later send to an application service.
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
event *gomatrixserverlib.Event,
|
||||
) error {
|
||||
return d.events.insertEvent(ctx, appServiceID, event)
|
||||
}
|
||||
|
||||
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
|
||||
// be sent to an application service given its ID.
|
||||
func (d *Database) GetEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
limit int,
|
||||
) (int, int, []gomatrixserverlib.Event, bool, error) {
|
||||
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
||||
}
|
||||
|
||||
// CountEventsWithAppServiceID returns the number of events destined for an
|
||||
// application service given its ID.
|
||||
func (d *Database) CountEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
|
||||
}
|
||||
|
||||
// UpdateTxnIDForEvents takes in an application service ID and a
|
||||
// and stores them in the DB, unless the pair already exists, in
|
||||
// which case it updates them.
|
||||
func (d *Database) UpdateTxnIDForEvents(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
maxID, txnID int,
|
||||
) error {
|
||||
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
|
||||
}
|
||||
|
||||
// RemoveEventsBeforeAndIncludingID removes all events from the database that
|
||||
// are less than or equal to a given maximum ID. IDs here are implemented as a
|
||||
// serial, thus this should always delete events in chronological order.
|
||||
func (d *Database) RemoveEventsBeforeAndIncludingID(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
eventTableID int,
|
||||
) error {
|
||||
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
|
||||
}
|
||||
|
||||
// GetLatestTxnID returns the latest available transaction id
|
||||
func (d *Database) GetLatestTxnID(
|
||||
ctx context.Context,
|
||||
) (int, error) {
|
||||
return d.txnID.selectTxnID(ctx)
|
||||
}
|
60
appservice/storage/sqlite3/txn_id_counter_table.go
Normal file
60
appservice/storage/sqlite3/txn_id_counter_table.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
const txnIDSchema = `
|
||||
-- Keeps a count of the current transaction ID
|
||||
CREATE TABLE IF NOT EXISTS appservice_counters (
|
||||
name TEXT PRIMARY KEY NOT NULL,
|
||||
last_id INTEGER DEFAULT 1
|
||||
);
|
||||
INSERT OR IGNORE INTO appservice_counters (name, last_id) VALUES('txn_id', 1);
|
||||
`
|
||||
|
||||
const selectTxnIDSQL = `
|
||||
SELECT last_id FROM appservice_counters WHERE name='txn_id';
|
||||
UPDATE appservice_counters SET last_id=last_id+1 WHERE name='txn_id';
|
||||
`
|
||||
|
||||
type txnStatements struct {
|
||||
selectTxnIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *txnStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(txnIDSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// selectTxnID selects the latest ascending transaction ID
|
||||
func (s *txnStatements) selectTxnID(
|
||||
ctx context.Context,
|
||||
) (txnID int, err error) {
|
||||
err = s.selectTxnIDStmt.QueryRowContext(ctx).Scan(&txnID)
|
||||
return
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -16,95 +16,33 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"net/url"
|
||||
|
||||
// Import postgres database driver
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/appservice/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/appservice/storage/sqlite3"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
type Database interface {
|
||||
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error
|
||||
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error)
|
||||
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
|
||||
UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
|
||||
RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
|
||||
GetLatestTxnID(ctx context.Context) (int, error)
|
||||
}
|
||||
|
||||
// NewDatabase opens a new database
|
||||
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||
var result Database
|
||||
var err error
|
||||
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||
return nil, err
|
||||
func NewDatabase(dataSourceName string) (Database, error) {
|
||||
uri, err := url.Parse(dataSourceName)
|
||||
if err != nil {
|
||||
return postgres.NewDatabase(dataSourceName)
|
||||
}
|
||||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
switch uri.Scheme {
|
||||
case "postgres":
|
||||
return postgres.NewDatabase(dataSourceName)
|
||||
case "file":
|
||||
return sqlite3.NewDatabase(dataSourceName)
|
||||
default:
|
||||
return postgres.NewDatabase(dataSourceName)
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (d *Database) prepare() error {
|
||||
if err := d.events.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.txnID.prepare(d.db)
|
||||
}
|
||||
|
||||
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
|
||||
// for a transaction worker to pull and later send to an application service.
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
event *gomatrixserverlib.Event,
|
||||
) error {
|
||||
return d.events.insertEvent(ctx, appServiceID, event)
|
||||
}
|
||||
|
||||
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
|
||||
// be sent to an application service given its ID.
|
||||
func (d *Database) GetEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
limit int,
|
||||
) (int, int, []gomatrixserverlib.Event, bool, error) {
|
||||
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
||||
}
|
||||
|
||||
// CountEventsWithAppServiceID returns the number of events destined for an
|
||||
// application service given its ID.
|
||||
func (d *Database) CountEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
|
||||
}
|
||||
|
||||
// UpdateTxnIDForEvents takes in an application service ID and a
|
||||
// and stores them in the DB, unless the pair already exists, in
|
||||
// which case it updates them.
|
||||
func (d *Database) UpdateTxnIDForEvents(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
maxID, txnID int,
|
||||
) error {
|
||||
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
|
||||
}
|
||||
|
||||
// RemoveEventsBeforeAndIncludingID removes all events from the database that
|
||||
// are less than or equal to a given maximum ID. IDs here are implemented as a
|
||||
// serial, thus this should always delete events in chronological order.
|
||||
func (d *Database) RemoveEventsBeforeAndIncludingID(
|
||||
ctx context.Context,
|
||||
appserviceID string,
|
||||
eventTableID int,
|
||||
) error {
|
||||
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
|
||||
}
|
||||
|
||||
// GetLatestTxnID returns the latest available transaction id
|
||||
func (d *Database) GetLatestTxnID(
|
||||
ctx context.Context,
|
||||
) (int, error) {
|
||||
return d.txnID.selectTxnID(ctx)
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ var (
|
|||
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
|
||||
// handles exponentially backing off in case the AS isn't currently available.
|
||||
func SetupTransactionWorkers(
|
||||
appserviceDB *storage.Database,
|
||||
appserviceDB storage.Database,
|
||||
workerStates []types.ApplicationServiceWorkerState,
|
||||
) error {
|
||||
// Create a worker that handles transmitting events to a single homeserver
|
||||
|
@ -58,7 +58,7 @@ func SetupTransactionWorkers(
|
|||
|
||||
// worker is a goroutine that sends any queued events to the application service
|
||||
// it is given.
|
||||
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||
func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||
log.WithFields(log.Fields{
|
||||
"appservice": ws.AppService.ID,
|
||||
}).Info("starting application service")
|
||||
|
@ -149,7 +149,7 @@ func backoff(ws *types.ApplicationServiceWorkerState, err error) {
|
|||
// transaction, and JSON-encodes the results.
|
||||
func createTransaction(
|
||||
ctx context.Context,
|
||||
db *storage.Database,
|
||||
db storage.Database,
|
||||
appserviceID string,
|
||||
) (
|
||||
transactionJSON []byte,
|
||||
|
|
115
mediaapi/storage/sqlite3/media_repository_table.go
Normal file
115
mediaapi/storage/sqlite3/media_repository_table.go
Normal file
|
@ -0,0 +1,115 @@
|
|||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const mediaSchema = `
|
||||
-- The media_repository table holds metadata for each media file stored and accessible to the local server,
|
||||
-- the actual file is stored separately.
|
||||
CREATE TABLE IF NOT EXISTS mediaapi_media_repository (
|
||||
-- The id used to refer to the media.
|
||||
-- For uploads to this server this is a base64-encoded sha256 hash of the file data
|
||||
-- For media from remote servers, this can be any unique identifier string
|
||||
media_id TEXT NOT NULL,
|
||||
-- The origin of the media as requested by the client. Should be a homeserver domain.
|
||||
media_origin TEXT NOT NULL,
|
||||
-- The MIME-type of the media file as specified when uploading.
|
||||
content_type TEXT NOT NULL,
|
||||
-- Size of the media file in bytes.
|
||||
file_size_bytes INTEGER NOT NULL,
|
||||
-- When the content was uploaded in UNIX epoch ms.
|
||||
creation_ts INTEGER NOT NULL,
|
||||
-- The file name with which the media was uploaded.
|
||||
upload_name TEXT NOT NULL,
|
||||
-- Alternate RFC 4648 unpadded base64 encoding string representation of a SHA-256 hash sum of the file data.
|
||||
base64hash TEXT NOT NULL,
|
||||
-- The user who uploaded the file. Should be a Matrix user ID.
|
||||
user_id TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_media_repository_index ON mediaapi_media_repository (media_id, media_origin);
|
||||
`
|
||||
|
||||
const insertMediaSQL = `
|
||||
INSERT INTO mediaapi_media_repository (media_id, media_origin, content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`
|
||||
|
||||
const selectMediaSQL = `
|
||||
SELECT content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id FROM mediaapi_media_repository WHERE media_id = $1 AND media_origin = $2
|
||||
`
|
||||
|
||||
type mediaStatements struct {
|
||||
insertMediaStmt *sql.Stmt
|
||||
selectMediaStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *mediaStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(mediaSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return statementList{
|
||||
{&s.insertMediaStmt, insertMediaSQL},
|
||||
{&s.selectMediaStmt, selectMediaSQL},
|
||||
}.prepare(db)
|
||||
}
|
||||
|
||||
func (s *mediaStatements) insertMedia(
|
||||
ctx context.Context, mediaMetadata *types.MediaMetadata,
|
||||
) error {
|
||||
mediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000)
|
||||
_, err := s.insertMediaStmt.ExecContext(
|
||||
ctx,
|
||||
mediaMetadata.MediaID,
|
||||
mediaMetadata.Origin,
|
||||
mediaMetadata.ContentType,
|
||||
mediaMetadata.FileSizeBytes,
|
||||
mediaMetadata.CreationTimestamp,
|
||||
mediaMetadata.UploadName,
|
||||
mediaMetadata.Base64Hash,
|
||||
mediaMetadata.UserID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *mediaStatements) selectMedia(
|
||||
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
|
||||
) (*types.MediaMetadata, error) {
|
||||
mediaMetadata := types.MediaMetadata{
|
||||
MediaID: mediaID,
|
||||
Origin: mediaOrigin,
|
||||
}
|
||||
err := s.selectMediaStmt.QueryRowContext(
|
||||
ctx, mediaMetadata.MediaID, mediaMetadata.Origin,
|
||||
).Scan(
|
||||
&mediaMetadata.ContentType,
|
||||
&mediaMetadata.FileSizeBytes,
|
||||
&mediaMetadata.CreationTimestamp,
|
||||
&mediaMetadata.UploadName,
|
||||
&mediaMetadata.Base64Hash,
|
||||
&mediaMetadata.UserID,
|
||||
)
|
||||
return &mediaMetadata, err
|
||||
}
|
38
mediaapi/storage/sqlite3/prepare.go
Normal file
38
mediaapi/storage/sqlite3/prepare.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// FIXME: This should be made common!
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// a statementList is a list of SQL statements to prepare and a pointer to where to store the resulting prepared statement.
|
||||
type statementList []struct {
|
||||
statement **sql.Stmt
|
||||
sql string
|
||||
}
|
||||
|
||||
// prepare the SQL for each statement in the list and assign the result to the prepared statement.
|
||||
func (s statementList) prepare(db *sql.DB) (err error) {
|
||||
for _, statement := range s {
|
||||
if *statement.statement, err = db.Prepare(statement.sql); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
36
mediaapi/storage/sqlite3/sql.go
Normal file
36
mediaapi/storage/sqlite3/sql.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type statements struct {
|
||||
media mediaStatements
|
||||
thumbnail thumbnailStatements
|
||||
}
|
||||
|
||||
func (s *statements) prepare(db *sql.DB) (err error) {
|
||||
if err = s.media.prepare(db); err != nil {
|
||||
return
|
||||
}
|
||||
if err = s.thumbnail.prepare(db); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
106
mediaapi/storage/sqlite3/storage.go
Normal file
106
mediaapi/storage/sqlite3/storage.go
Normal file
|
@ -0,0 +1,106 @@
|
|||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
// Import the postgres database driver.
|
||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// Database is used to store metadata about a repository of media files.
|
||||
type Database struct {
|
||||
statements statements
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// Open opens a postgres database.
|
||||
func Open(dataSourceName string) (*Database, error) {
|
||||
var d Database
|
||||
var err error
|
||||
if d.db, err = sql.Open("sqlite3", dataSourceName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.statements.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
// StoreMediaMetadata inserts the metadata about the uploaded media into the database.
|
||||
// Returns an error if the combination of MediaID and Origin are not unique in the table.
|
||||
func (d *Database) StoreMediaMetadata(
|
||||
ctx context.Context, mediaMetadata *types.MediaMetadata,
|
||||
) error {
|
||||
return d.statements.media.insertMedia(ctx, mediaMetadata)
|
||||
}
|
||||
|
||||
// GetMediaMetadata returns metadata about media stored on this server.
|
||||
// The media could have been uploaded to this server or fetched from another server and cached here.
|
||||
// Returns nil metadata if there is no metadata associated with this media.
|
||||
func (d *Database) GetMediaMetadata(
|
||||
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
|
||||
) (*types.MediaMetadata, error) {
|
||||
mediaMetadata, err := d.statements.media.selectMedia(ctx, mediaID, mediaOrigin)
|
||||
if err != nil && err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return mediaMetadata, err
|
||||
}
|
||||
|
||||
// StoreThumbnail inserts the metadata about the thumbnail into the database.
|
||||
// Returns an error if the combination of MediaID and Origin are not unique in the table.
|
||||
func (d *Database) StoreThumbnail(
|
||||
ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata,
|
||||
) error {
|
||||
return d.statements.thumbnail.insertThumbnail(ctx, thumbnailMetadata)
|
||||
}
|
||||
|
||||
// GetThumbnail returns metadata about a specific thumbnail.
|
||||
// The media could have been uploaded to this server or fetched from another server and cached here.
|
||||
// Returns nil metadata if there is no metadata associated with this thumbnail.
|
||||
func (d *Database) GetThumbnail(
|
||||
ctx context.Context,
|
||||
mediaID types.MediaID,
|
||||
mediaOrigin gomatrixserverlib.ServerName,
|
||||
width, height int,
|
||||
resizeMethod string,
|
||||
) (*types.ThumbnailMetadata, error) {
|
||||
thumbnailMetadata, err := d.statements.thumbnail.selectThumbnail(
|
||||
ctx, mediaID, mediaOrigin, width, height, resizeMethod,
|
||||
)
|
||||
if err != nil && err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return thumbnailMetadata, err
|
||||
}
|
||||
|
||||
// GetThumbnails returns metadata about all thumbnails for a specific media stored on this server.
|
||||
// The media could have been uploaded to this server or fetched from another server and cached here.
|
||||
// Returns nil metadata if there are no thumbnails associated with this media.
|
||||
func (d *Database) GetThumbnails(
|
||||
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
|
||||
) ([]*types.ThumbnailMetadata, error) {
|
||||
thumbnails, err := d.statements.thumbnail.selectThumbnails(ctx, mediaID, mediaOrigin)
|
||||
if err != nil && err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return thumbnails, err
|
||||
}
|
162
mediaapi/storage/sqlite3/thumbnail_table.go
Normal file
162
mediaapi/storage/sqlite3/thumbnail_table.go
Normal file
|
@ -0,0 +1,162 @@
|
|||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const thumbnailSchema = `
|
||||
-- The mediaapi_thumbnail table holds metadata for each thumbnail file stored and accessible to the local server,
|
||||
-- the actual file is stored separately.
|
||||
CREATE TABLE IF NOT EXISTS mediaapi_thumbnail (
|
||||
media_id TEXT NOT NULL,
|
||||
media_origin TEXT NOT NULL,
|
||||
content_type TEXT NOT NULL,
|
||||
file_size_bytes INTEGER NOT NULL,
|
||||
creation_ts INTEGER NOT NULL,
|
||||
width INTEGER NOT NULL,
|
||||
height INTEGER NOT NULL,
|
||||
resize_method TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_thumbnail_index ON mediaapi_thumbnail (media_id, media_origin, width, height, resize_method);
|
||||
`
|
||||
|
||||
const insertThumbnailSQL = `
|
||||
INSERT INTO mediaapi_thumbnail (media_id, media_origin, content_type, file_size_bytes, creation_ts, width, height, resize_method)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`
|
||||
|
||||
// Note: this selects one specific thumbnail
|
||||
const selectThumbnailSQL = `
|
||||
SELECT content_type, file_size_bytes, creation_ts FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2 AND width = $3 AND height = $4 AND resize_method = $5
|
||||
`
|
||||
|
||||
// Note: this selects all thumbnails for a media_origin and media_id
|
||||
const selectThumbnailsSQL = `
|
||||
SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2
|
||||
`
|
||||
|
||||
type thumbnailStatements struct {
|
||||
insertThumbnailStmt *sql.Stmt
|
||||
selectThumbnailStmt *sql.Stmt
|
||||
selectThumbnailsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *thumbnailStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(thumbnailSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return statementList{
|
||||
{&s.insertThumbnailStmt, insertThumbnailSQL},
|
||||
{&s.selectThumbnailStmt, selectThumbnailSQL},
|
||||
{&s.selectThumbnailsStmt, selectThumbnailsSQL},
|
||||
}.prepare(db)
|
||||
}
|
||||
|
||||
func (s *thumbnailStatements) insertThumbnail(
|
||||
ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata,
|
||||
) error {
|
||||
thumbnailMetadata.MediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000)
|
||||
_, err := s.insertThumbnailStmt.ExecContext(
|
||||
ctx,
|
||||
thumbnailMetadata.MediaMetadata.MediaID,
|
||||
thumbnailMetadata.MediaMetadata.Origin,
|
||||
thumbnailMetadata.MediaMetadata.ContentType,
|
||||
thumbnailMetadata.MediaMetadata.FileSizeBytes,
|
||||
thumbnailMetadata.MediaMetadata.CreationTimestamp,
|
||||
thumbnailMetadata.ThumbnailSize.Width,
|
||||
thumbnailMetadata.ThumbnailSize.Height,
|
||||
thumbnailMetadata.ThumbnailSize.ResizeMethod,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *thumbnailStatements) selectThumbnail(
|
||||
ctx context.Context,
|
||||
mediaID types.MediaID,
|
||||
mediaOrigin gomatrixserverlib.ServerName,
|
||||
width, height int,
|
||||
resizeMethod string,
|
||||
) (*types.ThumbnailMetadata, error) {
|
||||
thumbnailMetadata := types.ThumbnailMetadata{
|
||||
MediaMetadata: &types.MediaMetadata{
|
||||
MediaID: mediaID,
|
||||
Origin: mediaOrigin,
|
||||
},
|
||||
ThumbnailSize: types.ThumbnailSize{
|
||||
Width: width,
|
||||
Height: height,
|
||||
ResizeMethod: resizeMethod,
|
||||
},
|
||||
}
|
||||
err := s.selectThumbnailStmt.QueryRowContext(
|
||||
ctx,
|
||||
thumbnailMetadata.MediaMetadata.MediaID,
|
||||
thumbnailMetadata.MediaMetadata.Origin,
|
||||
thumbnailMetadata.ThumbnailSize.Width,
|
||||
thumbnailMetadata.ThumbnailSize.Height,
|
||||
thumbnailMetadata.ThumbnailSize.ResizeMethod,
|
||||
).Scan(
|
||||
&thumbnailMetadata.MediaMetadata.ContentType,
|
||||
&thumbnailMetadata.MediaMetadata.FileSizeBytes,
|
||||
&thumbnailMetadata.MediaMetadata.CreationTimestamp,
|
||||
)
|
||||
return &thumbnailMetadata, err
|
||||
}
|
||||
|
||||
func (s *thumbnailStatements) selectThumbnails(
|
||||
ctx context.Context, mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName,
|
||||
) ([]*types.ThumbnailMetadata, error) {
|
||||
rows, err := s.selectThumbnailsStmt.QueryContext(
|
||||
ctx, mediaID, mediaOrigin,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
var thumbnails []*types.ThumbnailMetadata
|
||||
for rows.Next() {
|
||||
thumbnailMetadata := types.ThumbnailMetadata{
|
||||
MediaMetadata: &types.MediaMetadata{
|
||||
MediaID: mediaID,
|
||||
Origin: mediaOrigin,
|
||||
},
|
||||
}
|
||||
err = rows.Scan(
|
||||
&thumbnailMetadata.MediaMetadata.ContentType,
|
||||
&thumbnailMetadata.MediaMetadata.FileSizeBytes,
|
||||
&thumbnailMetadata.MediaMetadata.CreationTimestamp,
|
||||
&thumbnailMetadata.ThumbnailSize.Width,
|
||||
&thumbnailMetadata.ThumbnailSize.Height,
|
||||
&thumbnailMetadata.ThumbnailSize.ResizeMethod,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
thumbnails = append(thumbnails, &thumbnailMetadata)
|
||||
}
|
||||
|
||||
return thumbnails, rows.Err()
|
||||
}
|
|
@ -19,6 +19,7 @@ import (
|
|||
"net/url"
|
||||
|
||||
"github.com/matrix-org/dendrite/mediaapi/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/mediaapi/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
@ -40,6 +41,8 @@ func Open(dataSourceName string) (Database, error) {
|
|||
switch uri.Scheme {
|
||||
case "postgres":
|
||||
return postgres.Open(dataSourceName)
|
||||
case "file":
|
||||
return sqlite3.Open(dataSourceName)
|
||||
default:
|
||||
return postgres.Open(dataSourceName)
|
||||
}
|
||||
|
|
|
@ -39,26 +39,15 @@ var editableAttributes = []string{
|
|||
const publicRoomsSchema = `
|
||||
-- Stores all of the rooms with data needed to create the server's room directory
|
||||
CREATE TABLE IF NOT EXISTS publicroomsapi_public_rooms(
|
||||
-- The room's ID
|
||||
room_id TEXT NOT NULL PRIMARY KEY,
|
||||
-- Number of joined members in the room
|
||||
joined_members INTEGER NOT NULL DEFAULT 0,
|
||||
-- Aliases of the room (empty array if none)
|
||||
aliases TEXT[] NOT NULL DEFAULT '{}'::TEXT[],
|
||||
-- Canonical alias of the room (empty string if none)
|
||||
aliases TEXT NOT NULL DEFAULT '',
|
||||
canonical_alias TEXT NOT NULL DEFAULT '',
|
||||
-- Name of the room (empty string if none)
|
||||
name TEXT NOT NULL DEFAULT '',
|
||||
-- Topic of the room (empty string if none)
|
||||
topic TEXT NOT NULL DEFAULT '',
|
||||
-- Is the room world readable?
|
||||
world_readable BOOLEAN NOT NULL DEFAULT false,
|
||||
-- Can guest join the room?
|
||||
guest_can_join BOOLEAN NOT NULL DEFAULT false,
|
||||
-- URL of the room avatar (empty string if none)
|
||||
avatar_url TEXT NOT NULL DEFAULT '',
|
||||
-- Visibility of the room: true means the room is publicly visible, false
|
||||
-- means the room is private
|
||||
visibility BOOLEAN NOT NULL DEFAULT false
|
||||
);
|
||||
`
|
||||
|
@ -71,13 +60,13 @@ const selectPublicRoomsSQL = "" +
|
|||
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
|
||||
" FROM publicroomsapi_public_rooms WHERE visibility = true" +
|
||||
" ORDER BY joined_members DESC" +
|
||||
" OFFSET $1"
|
||||
" LIMIT 30 OFFSET $1"
|
||||
|
||||
const selectPublicRoomsWithLimitSQL = "" +
|
||||
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
|
||||
" FROM publicroomsapi_public_rooms WHERE visibility = true" +
|
||||
" ORDER BY joined_members DESC" +
|
||||
" OFFSET $1 LIMIT $2"
|
||||
" LIMIT $2 OFFSET $1"
|
||||
|
||||
const selectPublicRoomsWithFilterSQL = "" +
|
||||
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
|
||||
|
@ -85,9 +74,9 @@ const selectPublicRoomsWithFilterSQL = "" +
|
|||
" WHERE visibility = true" +
|
||||
" AND (LOWER(name) LIKE LOWER($1)" +
|
||||
" OR LOWER(topic) LIKE LOWER($1)" +
|
||||
" OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" +
|
||||
" OR LOWER(aliases) LIKE LOWER($1))" + // TODO: Is there a better way to search aliases?
|
||||
" ORDER BY joined_members DESC" +
|
||||
" OFFSET $2"
|
||||
" LIMIT 30 OFFSET $2"
|
||||
|
||||
const selectPublicRoomsWithLimitAndFilterSQL = "" +
|
||||
"SELECT room_id, joined_members, aliases, canonical_alias, name, topic, world_readable, guest_can_join, avatar_url" +
|
||||
|
@ -95,9 +84,9 @@ const selectPublicRoomsWithLimitAndFilterSQL = "" +
|
|||
" WHERE visibility = true" +
|
||||
" AND (LOWER(name) LIKE LOWER($1)" +
|
||||
" OR LOWER(topic) LIKE LOWER($1)" +
|
||||
" OR LOWER(ARRAY_TO_STRING(aliases, ',')) LIKE LOWER($1))" +
|
||||
" OR LOWER(aliases) LIKE LOWER($1))" + // TODO: Is there a better way to search aliases?
|
||||
" ORDER BY joined_members DESC" +
|
||||
" OFFSET $2 LIMIT $3"
|
||||
" LIMIT $3 OFFSET $2"
|
||||
|
||||
const selectRoomVisibilitySQL = "" +
|
||||
"SELECT visibility FROM publicroomsapi_public_rooms" +
|
||||
|
@ -187,7 +176,7 @@ func (s *publicRoomsStatements) selectPublicRooms(
|
|||
)
|
||||
} else {
|
||||
rows, err = s.selectPublicRoomsWithLimitAndFilterStmt.QueryContext(
|
||||
ctx, pattern, offset, limit,
|
||||
ctx, pattern, limit, offset,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
|
@ -195,7 +184,7 @@ func (s *publicRoomsStatements) selectPublicRooms(
|
|||
rows, err = s.selectPublicRoomsStmt.QueryContext(ctx, offset)
|
||||
} else {
|
||||
rows, err = s.selectPublicRoomsWithLimitStmt.QueryContext(
|
||||
ctx, offset, limit,
|
||||
ctx, limit, offset,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/publicroomsapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
@ -43,6 +44,8 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (Database, error) {
|
|||
switch uri.Scheme {
|
||||
case "postgres":
|
||||
return postgres.NewPublicRoomsServerDatabase(dataSourceName)
|
||||
case "file":
|
||||
return sqlite3.NewPublicRoomsServerDatabase(dataSourceName)
|
||||
default:
|
||||
return postgres.NewPublicRoomsServerDatabase(dataSourceName)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue