mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-13 23:02:46 +00:00
Reduce CPU usage of SelectStateInRange (#2038)
This commit is contained in:
parent
b4a007ecce
commit
837f50ac89
2 changed files with 10 additions and 8 deletions
|
@ -116,7 +116,7 @@ const updateEventJSONSQL = "" +
|
|||
|
||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
|
||||
|
@ -221,13 +221,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
)
|
||||
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
||||
|
@ -243,7 +244,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
needSet := stateNeeded[ev.RoomID()]
|
||||
|
@ -258,7 +259,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
|
||||
eventIDToEvent[ev.EventID()] = types.StreamEvent{
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
ExcludeFromSync: excludeFromSync,
|
||||
|
|
|
@ -81,7 +81,7 @@ const updateEventJSONSQL = "" +
|
|||
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
||||
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2)" +
|
||||
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
|
||||
|
@ -173,13 +173,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDsJSON string
|
||||
delIDsJSON string
|
||||
)
|
||||
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
|
@ -201,7 +202,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
needSet := stateNeeded[ev.RoomID()]
|
||||
|
@ -216,7 +217,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
|
||||
eventIDToEvent[ev.EventID()] = types.StreamEvent{
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
StreamPosition: streamPos,
|
||||
ExcludeFromSync: excludeFromSync,
|
||||
|
|
Loading…
Reference in a new issue