Add AddsState helper function

This commit is contained in:
Neil Alexander 2022-03-07 16:30:57 +00:00
parent 84a940715f
commit c9fbe45475
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 28 additions and 30 deletions

View file

@ -87,17 +87,12 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
return true return true
} }
events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} events, err := output.NewRoomEvent.AddsState(ctx, s.rsAPI)
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 { if err != nil {
eventsReq := &api.QueryEventsByIDRequest{ log.WithError(err).Errorf("roomserver output log: failed to get state events")
EventIDs: output.NewRoomEvent.AddsStateEventIDs, return false
}
eventsRes := &api.QueryEventsByIDResponse{}
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
return false
}
events = append(events, eventsRes.Events...)
} }
events = append(events, output.NewRoomEvent.Event)
// Send event to any relevant application services // Send event to any relevant application services
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {

View file

@ -146,28 +146,13 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
eventsRes := &api.QueryEventsByIDResponse{} stateEvents, err := ore.AddsState(s.ctx, s.rsAPI)
if len(ore.AddsStateEventIDs) > 0 { if err != nil {
eventsReq := &api.QueryEventsByIDRequest{ return fmt.Errorf("ore.AddsState: %w", err)
EventIDs: ore.AddsStateEventIDs,
}
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
}
found := false
for _, event := range eventsRes.Events {
if event.EventID() == ore.Event.EventID() {
found = true
break
}
}
if !found {
eventsRes.Events = append(eventsRes.Events, ore.Event)
}
} }
stateEvents = append(stateEvents, ore.Event)
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events)) addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(stateEvents))
if err != nil { if err != nil {
return err return err
} }

View file

@ -15,6 +15,9 @@
package api package api
import ( import (
"context"
"fmt"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -163,6 +166,21 @@ type OutputNewRoomEvent struct {
TransactionID *TransactionID `json:"transaction_id,omitempty"` TransactionID *TransactionID `json:"transaction_id,omitempty"`
} }
// AddsState asks the roomserver API for events specified in `adds_state_event_ids`.
// The slice returned contains the output room event itself in all cases.
func (o *OutputNewRoomEvent) AddsState(ctx context.Context, rsAPI RoomserverInternalAPI) ([]*gomatrixserverlib.HeaderedEvent, error) {
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(o.AddsStateEventIDs))
eventsReq := &QueryEventsByIDRequest{
EventIDs: o.AddsStateEventIDs,
}
eventsRes := &QueryEventsByIDResponse{}
if err := rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
return nil, fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
}
events = append(events, eventsRes.Events...)
return events, nil
}
// An OutputOldRoomEvent is written when the roomserver receives an old event. // An OutputOldRoomEvent is written when the roomserver receives an old event.
// This will typically happen as a result of getting either missing events // This will typically happen as a result of getting either missing events
// or backfilling. Downstream components may wish to send these events to // or backfilling. Downstream components may wish to send these events to