De-map device list positions in streaming tokens (#1642)

* De-map device list positions in streaming tokens

* Fix lint error

* Tweak toOffset
This commit is contained in:
Neil Alexander 2020-12-15 15:09:10 +00:00 committed by GitHub
parent 98ebbd01e5
commit 38318b0f16
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 99 deletions

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync" syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -115,12 +114,10 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
} }
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.StreamingToken{ posUpdate := types.StreamingToken{
Logs: map[string]*types.LogPosition{ DeviceListPosition: types.LogPosition{
syncinternal.DeviceListLogName: {
Offset: msg.Offset, Offset: msg.Offset,
Partition: msg.Partition, Partition: msg.Partition,
}, },
},
} }
for userID := range queryRes.UserIDsToCount { for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)

View file

@ -73,15 +73,13 @@ func DeviceListCatchup(
offset = sarama.OffsetOldest offset = sarama.OffsetOldest
// Extract partition/offset from sync token // Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := from.Log(DeviceListLogName) if !from.DeviceListPosition.IsEmpty() {
if logOffset != nil { partition = from.DeviceListPosition.Partition
partition = logOffset.Partition offset = from.DeviceListPosition.Offset
offset = logOffset.Offset
} }
var toOffset int64 var toOffset int64
toOffset = sarama.OffsetNewest toOffset = sarama.OffsetNewest
toLog := to.Log(DeviceListLogName) if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 {
if toLog != nil && toLog.Offset > 0 {
toOffset = toLog.Offset toOffset = toLog.Offset
} }
var queryRes api.QueryKeyChangesResponse var queryRes api.QueryKeyChangesResponse
@ -130,10 +128,10 @@ func DeviceListCatchup(
} }
} }
// set the new token // set the new token
to.SetLog(DeviceListLogName, &types.LogPosition{ to.DeviceListPosition = types.LogPosition{
Partition: queryRes.Partition, Partition: queryRes.Partition,
Offset: queryRes.Offset, Offset: queryRes.Offset,
}) }
res.NextBatch = to.String() res.NextBatch = to.String()
return hasNew, nil return hasNew, nil

View file

@ -18,12 +18,10 @@ var (
syncingUser = "@alice:localhost" syncingUser = "@alice:localhost"
emptyToken = types.StreamingToken{} emptyToken = types.StreamingToken{}
newestToken = types.StreamingToken{ newestToken = types.StreamingToken{
Logs: map[string]*types.LogPosition{ DeviceListPosition: types.LogPosition{
DeviceListLogName: {
Offset: sarama.OffsetNewest, Offset: sarama.OffsetNewest,
Partition: 0, Partition: 0,
}, },
},
} }
) )

View file

@ -17,7 +17,6 @@ package types
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"sort"
"strconv" "strconv"
"strings" "strings"
@ -45,6 +44,10 @@ type LogPosition struct {
Offset int64 Offset int64
} }
func (p *LogPosition) IsEmpty() bool {
return p.Offset == 0
}
// IsAfter returns true if this position is after `lp`. // IsAfter returns true if this position is after `lp`.
func (p *LogPosition) IsAfter(lp *LogPosition) bool { func (p *LogPosition) IsAfter(lp *LogPosition) bool {
if lp == nil { if lp == nil {
@ -110,22 +113,7 @@ type StreamingToken struct {
TypingPosition StreamPosition TypingPosition StreamPosition
ReceiptPosition StreamPosition ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition SendToDevicePosition StreamPosition
Logs map[string]*LogPosition DeviceListPosition LogPosition
}
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
if t.Logs == nil {
t.Logs = make(map[string]*LogPosition)
}
t.Logs[name] = lp
}
func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.Logs[name]
if !ok {
return nil
}
return l
} }
func (t StreamingToken) String() string { func (t StreamingToken) String() string {
@ -134,14 +122,10 @@ func (t StreamingToken) String() string {
t.PDUPosition, t.TypingPosition, t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition, t.ReceiptPosition, t.SendToDevicePosition,
) )
var logStrings []string if dl := t.DeviceListPosition; !dl.IsEmpty() {
for name, lp := range t.Logs { posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
logStrings = append(logStrings, logStr)
} }
sort.Strings(logStrings) return posStr
// E.g s11_22_33_44.dl0-134.ab1-441
return strings.Join(append([]string{posStr}, logStrings...), ".")
} }
// IsAfter returns true if ANY position in this token is greater than `other`. // IsAfter returns true if ANY position in this token is greater than `other`.
@ -155,21 +139,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true return true
case t.SendToDevicePosition > other.SendToDevicePosition: case t.SendToDevicePosition > other.SendToDevicePosition:
return true return true
} case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
for name := range t.Logs {
otherLog := other.Log(name)
if otherLog == nil {
continue
}
if t.Logs[name].IsAfter(otherLog) {
return true return true
} }
}
return false return false
} }
func (t *StreamingToken) IsEmpty() bool { func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition == 0 && t.DeviceListPosition.IsEmpty()
} }
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken. // WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -187,15 +164,8 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
ret.ReceiptPosition = other.ReceiptPosition ret.ReceiptPosition = other.ReceiptPosition
case other.SendToDevicePosition > 0: case other.SendToDevicePosition > 0:
ret.SendToDevicePosition = other.SendToDevicePosition ret.SendToDevicePosition = other.SendToDevicePosition
} case other.DeviceListPosition.Offset > 0:
ret.Logs = make(map[string]*LogPosition) ret.DeviceListPosition = other.DeviceListPosition
for name := range t.Logs {
otherLog := other.Log(name)
if otherLog == nil {
continue
}
copy := *otherLog
ret.Logs[name] = &copy
} }
return ret return ret
} }
@ -294,29 +264,30 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
TypingPosition: positions[1], TypingPosition: positions[1],
ReceiptPosition: positions[2], ReceiptPosition: positions[2],
SendToDevicePosition: positions[3], SendToDevicePosition: positions[3],
Logs: make(map[string]*LogPosition),
} }
// dl-0-1234 // dl-0-1234
// $log_name-$partition-$offset // $log_name-$partition-$offset
for _, logStr := range categories[1:] { for _, logStr := range categories[1:] {
segments := strings.Split(logStr, "-") segments := strings.Split(logStr, "-")
if len(segments) != 3 { if len(segments) != 3 {
err = fmt.Errorf("token %s - invalid log: %s", tok, logStr) err = fmt.Errorf("invalid log position %q", logStr)
return return
} }
var partition int64 switch segments[0] {
partition, err = strconv.ParseInt(segments[1], 10, 32) case "dl":
if err != nil { // Device list syncing
var partition, offset int
if partition, err = strconv.Atoi(segments[1]); err != nil {
return return
} }
var offset int64 if offset, err = strconv.Atoi(segments[2]); err != nil {
offset, err = strconv.ParseInt(segments[2], 10, 64)
if err != nil {
return return
} }
token.Logs[segments[0]] = &LogPosition{ token.DeviceListPosition.Partition = int32(partition)
Partition: int32(partition), token.DeviceListPosition.Offset = int64(offset)
Offset: offset, default:
err = fmt.Errorf("unrecognised token type %q", segments[0])
return
} }
} }
return token, nil return token, nil

View file

@ -12,30 +12,14 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{ tests := map[string]*StreamingToken{
"s4_0_0_0": { "s4_0_0_0": {
PDUPosition: 4, PDUPosition: 4,
Logs: make(map[string]*LogPosition),
}, },
"s4_0_0_0.dl-0-123": { "s4_0_0_0.dl-0-123": {
PDUPosition: 4, PDUPosition: 4,
Logs: map[string]*LogPosition{ DeviceListPosition: LogPosition{
"dl": {
Partition: 0, Partition: 0,
Offset: 123, Offset: 123,
}, },
}, },
},
"s4_0_0_0.ab-1-14419482332.dl-0-123": {
PDUPosition: 4,
Logs: map[string]*LogPosition{
"ab": {
Partition: 1,
Offset: 14419482332,
},
"dl": {
Partition: 0,
Offset: 123,
},
},
},
} }
for tok, want := range tests { for tok, want := range tests {
got, err := NewStreamTokenFromString(tok) got, err := NewStreamTokenFromString(tok)
@ -58,9 +42,9 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) { func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{ shouldPass := map[string]string{
"s4_0_0_0": StreamingToken{4, 0, 0, 0, nil}.String(), "s4_0_0_0": StreamingToken{4, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0": StreamingToken{3, 1, 0, 0, nil}.String(), "s3_1_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3": StreamingToken{3, 1, 2, 3, nil}.String(), "s3_1_2_3": StreamingToken{3, 1, 2, 3, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(), "t3_1": TopologyToken{3, 1}.String(),
} }