mirror of
https://github.com/matrix-org/dendrite
synced 2025-01-20 17:04:00 +00:00
Handle duplicate kafka messages (#301)
The way we store the partition offsets for kafka streams means that when we start after a crash we may get the last message we processed again. This means that we have to be careful to ensure that the processing handles consecutive duplicates correctly.
This commit is contained in:
parent
1a026f16d5
commit
35b628f5bf
2 changed files with 27 additions and 8 deletions
|
@ -134,6 +134,14 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
return err
|
||||
}
|
||||
|
||||
if oldJoinedHosts == nil {
|
||||
// This means that there is nothing to update as this is a duplicate
|
||||
// message.
|
||||
// This can happen if dendrite crashed between reading the message and
|
||||
// persisting the stream position.
|
||||
return nil
|
||||
}
|
||||
|
||||
if ore.SendAsServer == api.DoNotSendToOtherServers {
|
||||
// Ignore event that we don't need to send anywhere.
|
||||
return nil
|
||||
|
@ -146,13 +154,9 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
}
|
||||
|
||||
// Send the event.
|
||||
if err = s.queues.SendEvent(
|
||||
return s.queues.SendEvent(
|
||||
&ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
)
|
||||
}
|
||||
|
||||
// joinedHostsAtEvent works out a list of matrix servers that were joined to
|
||||
|
|
|
@ -62,7 +62,10 @@ func (d *Database) prepare() error {
|
|||
}
|
||||
|
||||
// UpdateRoom updates the joined hosts for a room and returns what the joined
|
||||
// hosts were before the update.
|
||||
// hosts were before the update, or nil if this was a duplicate message.
|
||||
// This is called when we receive a message from kafka, so we pass in
|
||||
// oldEventID and newEventID to check that we haven't missed any messages or
|
||||
// this isn't a duplicate message.
|
||||
func (d *Database) UpdateRoom(
|
||||
ctx context.Context,
|
||||
roomID, oldEventID, newEventID string,
|
||||
|
@ -70,22 +73,34 @@ func (d *Database) UpdateRoom(
|
|||
removeHosts []string,
|
||||
) (joinedHosts []types.JoinedHost, err error) {
|
||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||
if err = d.insertRoom(ctx, txn, roomID); err != nil {
|
||||
err = d.insertRoom(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if lastSentEventID == newEventID {
|
||||
// We've handled this message before, so let's just ignore it.
|
||||
// We can only get a duplicate for the last message we processed,
|
||||
// so its enough just to compare the newEventID with lastSentEventID
|
||||
return nil
|
||||
}
|
||||
|
||||
if lastSentEventID != oldEventID {
|
||||
return types.EventIDMismatchError{
|
||||
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
|
||||
}
|
||||
}
|
||||
|
||||
joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, add := range addHosts {
|
||||
err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue