Fix initial sync (#1465)

* Fix complete sync check

* Remove unnecessary 'since' copy

* Fix failing test

* Un-whitelist a couple of tests

Co-authored-by: Kegan Dougal <kegan@matrix.org>
This commit is contained in:
Neil Alexander 2020-10-02 12:50:58 +01:00 committed by GitHub
parent 1b29e5771f
commit fb9a8f215b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 19 deletions

View file

@ -670,7 +670,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
// nolint:nakedret // nolint:nakedret
func (d *Database) getResponseWithPDUsForCompleteSync( func (d *Database) getResponseWithPDUsForCompleteSync(
ctx context.Context, res *types.Response, ctx context.Context, res *types.Response,
userID string, deviceID string, userID string, device userapi.Device,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
) ( ) (
toPos types.StreamingToken, toPos types.StreamingToken,
@ -712,7 +712,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = d.getJoinResponseForCompleteSync( jr, err = d.getJoinResponseForCompleteSync(
ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, device,
) )
if err != nil { if err != nil {
return return
@ -721,7 +721,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
} }
// Add peeked rooms. // Add peeked rooms.
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r) peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
if err != nil { if err != nil {
return return
} }
@ -729,7 +729,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
if !peek.Deleted { if !peek.Deleted {
var jr *types.JoinResponse var jr *types.JoinResponse
jr, err = d.getJoinResponseForCompleteSync( jr, err = d.getJoinResponseForCompleteSync(
ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, device,
) )
if err != nil { if err != nil {
return return
@ -751,7 +751,7 @@ func (d *Database) getJoinResponseForCompleteSync(
roomID string, roomID string,
r types.Range, r types.Range,
stateFilter *gomatrixserverlib.StateFilter, stateFilter *gomatrixserverlib.StateFilter,
numRecentEventsPerRoom int, numRecentEventsPerRoom int, device userapi.Device,
) (jr *types.JoinResponse, err error) { ) (jr *types.JoinResponse, err error) {
var stateEvents []gomatrixserverlib.HeaderedEvent var stateEvents []gomatrixserverlib.HeaderedEvent
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
@ -784,8 +784,9 @@ func (d *Database) getJoinResponseForCompleteSync(
} }
// We don't include a device here as we don't need to send down // We don't include a device here as we don't need to send down
// transaction IDs for complete syncs // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) // "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents) stateEvents = removeDuplicates(stateEvents, recentEvents)
jr = types.NewJoinResponse() jr = types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatchStr jr.Timeline.PrevBatch = prevBatchStr
@ -800,7 +801,7 @@ func (d *Database) CompleteSync(
device userapi.Device, numRecentEventsPerRoom int, device userapi.Device, numRecentEventsPerRoom int,
) (*types.Response, error) { ) (*types.Response, error) {
toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
ctx, res, device.UserID, device.ID, numRecentEventsPerRoom, ctx, res, device.UserID, device, numRecentEventsPerRoom,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err) return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)

View file

@ -197,19 +197,14 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) { func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) {
res := types.NewResponse() res := types.NewResponse()
since := types.NewStreamToken(0, 0, nil)
if req.since != nil {
since = *req.since
}
// See if we have any new tasks to do for the send-to-device messaging. // See if we have any new tasks to do for the send-to-device messaging.
events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, since) events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, *req.since)
if err != nil { if err != nil {
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err) return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
} }
// TODO: handle ignored users // TODO: handle ignored users
if req.since == nil { if req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0 {
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
if err != nil { if err != nil {
return res, fmt.Errorf("rp.db.CompleteSync: %w", err) return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
@ -226,7 +221,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil { if err != nil {
return res, fmt.Errorf("rp.appendAccountData: %w", err) return res, fmt.Errorf("rp.appendAccountData: %w", err)
} }
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos) res, err = rp.appendDeviceLists(res, req.device.UserID, *req.since, latestPos)
if err != nil { if err != nil {
return res, fmt.Errorf("rp.appendDeviceLists: %w", err) return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
} }
@ -240,7 +235,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Then add the updates into the sync response. // Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 { if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database. // Handle the updates and deletions in the database.
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, since) err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, *req.since)
if err != nil { if err != nil {
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err) return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
} }

View file

@ -396,8 +396,6 @@ GET /rooms/:room_id/state fetches entire room state
Setting room topic reports m.room.topic to myself Setting room topic reports m.room.topic to myself
setting 'm.room.name' respects room powerlevel setting 'm.room.name' respects room powerlevel
Syncing a new room with a large timeline limit isn't limited Syncing a new room with a large timeline limit isn't limited
Left rooms appear in the leave section of sync
Banned rooms appear in the leave section of sync
Getting state checks the events requested belong to the room Getting state checks the events requested belong to the room
Getting state IDs checks the events requested belong to the room Getting state IDs checks the events requested belong to the room
Can invite users to invite-only rooms Can invite users to invite-only rooms