Parallelise PDU stream fetching for complete sync

This commit is contained in:
Neil Alexander 2021-01-29 12:53:18 +00:00
parent 6d1c6f29e0
commit 9b6e807f82
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -52,17 +52,25 @@ func (p *PDUStreamProvider) CompleteSync(
eventFilter := req.Filter.Room.Timeline eventFilter := req.Filter.Room.Timeline
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
var reqMutex sync.Mutex
var reqWaitGroup sync.WaitGroup
reqWaitGroup.add(len(joinedRooms))
for _, roomID := range joinedRoomIDs { for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse go func() {
jr, err = p.getJoinResponseForCompleteSync( defer reqWaitGroup.Done()
ctx, roomID, r, &stateFilter, &eventFilter, req.Device, var jr *types.JoinResponse
) jr, err = p.getJoinResponseForCompleteSync(
if err != nil { ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") )
return from if err != nil {
} req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
req.Response.Rooms.Join[roomID] = *jr return from
req.Rooms[roomID] = gomatrixserverlib.Join }
reqMutex.Lock()
defer reqMutex.Unlock()
req.Response.Rooms.Join[roomID] = *jr
req.Rooms[roomID] = gomatrixserverlib.Join
}()
} }
// Add peeked rooms. // Add peeked rooms.