diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go new file mode 100644 index 000000000..c3be0a3a2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -0,0 +1,48 @@ +package storage + +import ( + "database/sql" + + "github.com/lib/pq" +) + +const outputRoomEventsSchema = ` +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS output_room_events ( + -- An incrementing ID which denotes the position in the log that this event resides at. + -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. + -- This isn't a problem for us since we just want to order by this field. + id BIGSERIAL PRIMARY KEY, + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- A list of event IDs which represent a delta of added/removed room state. + add_state_ids TEXT[] NOT NULL, + remove_state_ids TEXT[] NOT NULL +); +` + +const insertEventSQL = "" + + "INSERT INTO output_room_events (room_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4)" + +type outputRoomEventsStatements struct { + insertEventStmt *sql.Stmt +} + +func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(outputRoomEventsSchema) + if err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + return +} + +// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. +func (s *outputRoomEventsStatements) InsertEvent(roomID string, eventJSON []byte, addState, removeState []string) error { + _, err := s.insertEventStmt.Exec(roomID, eventJSON, pq.StringArray(addState), pq.StringArray(removeState)) + return err +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index 7d8f4f504..0a911d98f 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -2,16 +2,17 @@ package storage import ( "database/sql" - - "github.com/matrix-org/dendrite/common" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" ) // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { db *sql.DB partitions common.PartitionOffsetStatements + events outputRoomEventsStatements } // NewSyncServerDatabase creates a new sync server database @@ -25,7 +26,17 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err = partitions.Prepare(db); err != nil { return nil, err } - return &SyncServerDatabase{db, partitions}, nil + events := outputRoomEventsStatements{} + if err = events.prepare(db); err != nil { + return nil, err + } + return &SyncServerDatabase{db, partitions, events}, nil +} + +// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races +// when generating the stream position for this event. Returns an error if there was a problem inserting this event. +func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) error { + return d.events.InsertEvent(ev.RoomID(), ev.JSON(), addStateEventIDs, removeStateEventIDs) } // PartitionOffsets implements common.PartitionStorer diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 71c6ec887..1c97c6a3b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -1,19 +1,25 @@ package sync import ( + "encoding/json" + log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/storage" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" sarama "gopkg.in/Shopify/sarama.v1" ) // Server contains all the logic for running a sync server type Server struct { roomServerConsumer *common.ContinualConsumer + db *storage.SyncServerDatabase } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) { +func NewServer(cfg *config.Sync, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -26,6 +32,7 @@ func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) } s := &Server{ roomServerConsumer: &consumer, + db: store, } consumer.ProcessMessage = s.onMessage @@ -38,6 +45,29 @@ func (s *Server) Start() error { } func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { - log.WithField("key", string(msg.Key)).WithField("val", string(msg.Value)).Info("Recv") + // Parse out the event JSON + var output api.OutputRoomEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) + if err != nil { + log.WithError(err).Errorf("roomserver output log: event parse failure") + return nil + } + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + }).Info("received event from roomserver") + + if err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs); err != nil { + // panic rather than continue with an inconsistent database + log.WithError(err).WithField("OutputRoomEvent", output).Panicf("roomserver output log: write event failure") + return nil + } + return nil }