mirror of
https://github.com/matrix-org/dendrite
synced 2025-01-05 17:58:42 +00:00
Roomserver producers package (#2546)
* Give the roomserver a producers package * Change init point * Populate ACLs API * Fix build issues * `RoomEventProducer` naming
This commit is contained in:
parent
89cd0e8fc1
commit
b50a24c666
11 changed files with 137 additions and 103 deletions
|
@ -216,11 +216,10 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = api.SendEvents(ctx, r.RSAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,8 +12,10 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/perform"
|
"github.com/matrix-org/dendrite/roomserver/internal/perform"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/producers"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -49,17 +51,21 @@ type RoomserverInternalAPI struct {
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
Durable string
|
Durable string
|
||||||
InputRoomEventTopic string // JetStream topic for new input room events
|
InputRoomEventTopic string // JetStream topic for new input room events
|
||||||
OutputRoomEventTopic string // JetStream topic for new output room events
|
OutputProducer *producers.RoomEventProducer
|
||||||
PerspectiveServerNames []gomatrixserverlib.ServerName
|
PerspectiveServerNames []gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRoomserverAPI(
|
func NewRoomserverAPI(
|
||||||
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
|
processCtx *process.ProcessContext, cfg *config.RoomServer, roomserverDB storage.Database,
|
||||||
consumer nats.JetStreamContext, nc *nats.Conn,
|
js nats.JetStreamContext, nc *nats.Conn, inputRoomEventTopic string,
|
||||||
inputRoomEventTopic, outputRoomEventTopic string,
|
|
||||||
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
|
caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName,
|
||||||
) *RoomserverInternalAPI {
|
) *RoomserverInternalAPI {
|
||||||
serverACLs := acls.NewServerACLs(roomserverDB)
|
serverACLs := acls.NewServerACLs(roomserverDB)
|
||||||
|
producer := &producers.RoomEventProducer{
|
||||||
|
Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent)),
|
||||||
|
JetStream: js,
|
||||||
|
ACLs: serverACLs,
|
||||||
|
}
|
||||||
a := &RoomserverInternalAPI{
|
a := &RoomserverInternalAPI{
|
||||||
ProcessContext: processCtx,
|
ProcessContext: processCtx,
|
||||||
DB: roomserverDB,
|
DB: roomserverDB,
|
||||||
|
@ -68,8 +74,8 @@ func NewRoomserverAPI(
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
PerspectiveServerNames: perspectiveServerNames,
|
PerspectiveServerNames: perspectiveServerNames,
|
||||||
InputRoomEventTopic: inputRoomEventTopic,
|
InputRoomEventTopic: inputRoomEventTopic,
|
||||||
OutputRoomEventTopic: outputRoomEventTopic,
|
OutputProducer: producer,
|
||||||
JetStream: consumer,
|
JetStream: js,
|
||||||
NATSClient: nc,
|
NATSClient: nc,
|
||||||
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
|
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
|
||||||
ServerACLs: serverACLs,
|
ServerACLs: serverACLs,
|
||||||
|
@ -92,19 +98,19 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
||||||
r.KeyRing = keyRing
|
r.KeyRing = keyRing
|
||||||
|
|
||||||
r.Inputer = &input.Inputer{
|
r.Inputer = &input.Inputer{
|
||||||
Cfg: r.Cfg,
|
Cfg: r.Cfg,
|
||||||
ProcessContext: r.ProcessContext,
|
ProcessContext: r.ProcessContext,
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
InputRoomEventTopic: r.InputRoomEventTopic,
|
InputRoomEventTopic: r.InputRoomEventTopic,
|
||||||
OutputRoomEventTopic: r.OutputRoomEventTopic,
|
OutputProducer: r.OutputProducer,
|
||||||
JetStream: r.JetStream,
|
JetStream: r.JetStream,
|
||||||
NATSClient: r.NATSClient,
|
NATSClient: r.NATSClient,
|
||||||
Durable: nats.Durable(r.Durable),
|
Durable: nats.Durable(r.Durable),
|
||||||
ServerName: r.Cfg.Matrix.ServerName,
|
ServerName: r.Cfg.Matrix.ServerName,
|
||||||
FSAPI: fsAPI,
|
FSAPI: fsAPI,
|
||||||
KeyRing: keyRing,
|
KeyRing: keyRing,
|
||||||
ACLs: r.ServerACLs,
|
ACLs: r.ServerACLs,
|
||||||
Queryer: r.Queryer,
|
Queryer: r.Queryer,
|
||||||
}
|
}
|
||||||
r.Inviter = &perform.Inviter{
|
r.Inviter = &perform.Inviter{
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
|
@ -199,7 +205,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
||||||
if len(outputEvents) == 0 {
|
if len(outputEvents) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
|
return r.OutputProducer.ProduceRoomEvents(req.Event.RoomID(), outputEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomserverInternalAPI) PerformLeave(
|
func (r *RoomserverInternalAPI) PerformLeave(
|
||||||
|
@ -215,7 +221,7 @@ func (r *RoomserverInternalAPI) PerformLeave(
|
||||||
if len(outputEvents) == 0 {
|
if len(outputEvents) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return r.WriteOutputEvents(req.RoomID, outputEvents)
|
return r.OutputProducer.ProduceRoomEvents(req.RoomID, outputEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomserverInternalAPI) PerformForget(
|
func (r *RoomserverInternalAPI) PerformForget(
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/producers"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
@ -37,16 +38,8 @@ import (
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/tidwall/gjson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var keyContentFields = map[string]string{
|
|
||||||
"m.room.join_rules": "join_rule",
|
|
||||||
"m.room.history_visibility": "history_visibility",
|
|
||||||
"m.room.member": "membership",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inputer is responsible for consuming from the roomserver input
|
// Inputer is responsible for consuming from the roomserver input
|
||||||
// streams and processing the events. All input events are queued
|
// streams and processing the events. All input events are queued
|
||||||
// into a single NATS stream and the order is preserved strictly.
|
// into a single NATS stream and the order is preserved strictly.
|
||||||
|
@ -75,19 +68,19 @@ var keyContentFields = map[string]string{
|
||||||
// up, so they will do nothing until a new event comes in for B
|
// up, so they will do nothing until a new event comes in for B
|
||||||
// or C.
|
// or C.
|
||||||
type Inputer struct {
|
type Inputer struct {
|
||||||
Cfg *config.RoomServer
|
Cfg *config.RoomServer
|
||||||
ProcessContext *process.ProcessContext
|
ProcessContext *process.ProcessContext
|
||||||
DB storage.Database
|
DB storage.Database
|
||||||
NATSClient *nats.Conn
|
NATSClient *nats.Conn
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
Durable nats.SubOpt
|
Durable nats.SubOpt
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
FSAPI fedapi.RoomserverFederationAPI
|
FSAPI fedapi.RoomserverFederationAPI
|
||||||
KeyRing gomatrixserverlib.JSONVerifier
|
KeyRing gomatrixserverlib.JSONVerifier
|
||||||
ACLs *acls.ServerACLs
|
ACLs *acls.ServerACLs
|
||||||
InputRoomEventTopic string
|
InputRoomEventTopic string
|
||||||
OutputRoomEventTopic string
|
OutputProducer *producers.RoomEventProducer
|
||||||
workers sync.Map // room ID -> *worker
|
workers sync.Map // room ID -> *worker
|
||||||
|
|
||||||
Queryer *query.Queryer
|
Queryer *query.Queryer
|
||||||
}
|
}
|
||||||
|
@ -370,58 +363,6 @@ func (r *Inputer) InputRoomEvents(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOutputEvents implements OutputRoomEventWriter
|
|
||||||
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
|
||||||
var err error
|
|
||||||
for _, update := range updates {
|
|
||||||
msg := &nats.Msg{
|
|
||||||
Subject: r.OutputRoomEventTopic,
|
|
||||||
Header: nats.Header{},
|
|
||||||
}
|
|
||||||
msg.Header.Set(jetstream.RoomID, roomID)
|
|
||||||
msg.Data, err = json.Marshal(update)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
logger := log.WithFields(log.Fields{
|
|
||||||
"room_id": roomID,
|
|
||||||
"type": update.Type,
|
|
||||||
})
|
|
||||||
if update.NewRoomEvent != nil {
|
|
||||||
eventType := update.NewRoomEvent.Event.Type()
|
|
||||||
logger = logger.WithFields(log.Fields{
|
|
||||||
"event_type": eventType,
|
|
||||||
"event_id": update.NewRoomEvent.Event.EventID(),
|
|
||||||
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
|
||||||
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
|
||||||
"send_as_server": update.NewRoomEvent.SendAsServer,
|
|
||||||
"sender": update.NewRoomEvent.Event.Sender(),
|
|
||||||
})
|
|
||||||
if update.NewRoomEvent.Event.StateKey() != nil {
|
|
||||||
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
|
||||||
}
|
|
||||||
contentKey := keyContentFields[eventType]
|
|
||||||
if contentKey != "" {
|
|
||||||
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
|
||||||
if value.Exists() {
|
|
||||||
logger = logger.WithField("content_value", value.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
|
||||||
ev := update.NewRoomEvent.Event.Unwrap()
|
|
||||||
defer r.ACLs.OnServerACLUpdate(ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
|
|
||||||
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
|
||||||
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: "dendrite",
|
Namespace: "dendrite",
|
||||||
|
|
|
@ -381,7 +381,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
case api.KindOld:
|
case api.KindOld:
|
||||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeOldRoomEvent,
|
Type: api.OutputTypeOldRoomEvent,
|
||||||
OldRoomEvent: &api.OutputOldRoomEvent{
|
OldRoomEvent: &api.OutputOldRoomEvent{
|
||||||
|
@ -400,7 +400,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
// so notify downstream components to redact this event - they should have it if they've
|
// so notify downstream components to redact this event - they should have it if they've
|
||||||
// been tracking our output log.
|
// been tracking our output log.
|
||||||
if redactedEventID != "" {
|
if redactedEventID != "" {
|
||||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
err = r.OutputProducer.ProduceRoomEvents(event.RoomID(), []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeRedactedEvent,
|
Type: api.OutputTypeRedactedEvent,
|
||||||
RedactedEvent: &api.OutputRedactedEvent{
|
RedactedEvent: &api.OutputRedactedEvent{
|
||||||
|
|
|
@ -192,7 +192,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||||
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
|
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID(), updates); err != nil {
|
||||||
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -219,7 +219,7 @@ func (r *Admin) PerformAdminEvacuateUser(
|
||||||
if len(outputEvents) == 0 {
|
if len(outputEvents) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := r.Inputer.WriteOutputEvents(roomID, outputEvents); err != nil {
|
if err := r.Inputer.OutputProducer.ProduceRoomEvents(roomID, outputEvents); err != nil {
|
||||||
res.Error = &api.PerformError{
|
res.Error = &api.PerformError{
|
||||||
Code: api.PerformErrorBadRequest,
|
Code: api.PerformErrorBadRequest,
|
||||||
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
|
Msg: fmt.Sprintf("r.Inputer.WriteOutputEvents: %s", err),
|
||||||
|
|
|
@ -113,7 +113,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
||||||
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
|
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
|
err = r.Inputer.OutputProducer.ProduceRoomEvents(request.RoomID, []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeNewInboundPeek,
|
Type: api.OutputTypeNewInboundPeek,
|
||||||
NewInboundPeek: &api.OutputNewInboundPeek{
|
NewInboundPeek: &api.OutputNewInboundPeek{
|
||||||
|
|
|
@ -207,7 +207,7 @@ func (r *Peeker) performPeekRoomByID(
|
||||||
|
|
||||||
// TODO: handle federated peeks
|
// TODO: handle federated peeks
|
||||||
|
|
||||||
err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
|
err = r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeNewPeek,
|
Type: api.OutputTypeNewPeek,
|
||||||
NewPeek: &api.OutputNewPeek{
|
NewPeek: &api.OutputNewPeek{
|
||||||
|
|
|
@ -96,7 +96,7 @@ func (r *Unpeeker) performUnpeekRoomByID(
|
||||||
|
|
||||||
// TODO: handle federated peeks
|
// TODO: handle federated peeks
|
||||||
|
|
||||||
err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{
|
err = r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeRetirePeek,
|
Type: api.OutputTypeRetirePeek,
|
||||||
RetirePeek: &api.OutputRetirePeek{
|
RetirePeek: &api.OutputRetirePeek{
|
||||||
|
|
89
roomserver/producers/roomevent.go
Normal file
89
roomserver/producers/roomevent.go
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package producers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
var keyContentFields = map[string]string{
|
||||||
|
"m.room.join_rules": "join_rule",
|
||||||
|
"m.room.history_visibility": "history_visibility",
|
||||||
|
"m.room.member": "membership",
|
||||||
|
}
|
||||||
|
|
||||||
|
type RoomEventProducer struct {
|
||||||
|
Topic string
|
||||||
|
ACLs *acls.ServerACLs
|
||||||
|
JetStream nats.JetStreamContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error {
|
||||||
|
var err error
|
||||||
|
for _, update := range updates {
|
||||||
|
msg := &nats.Msg{
|
||||||
|
Subject: r.Topic,
|
||||||
|
Header: nats.Header{},
|
||||||
|
}
|
||||||
|
msg.Header.Set(jetstream.RoomID, roomID)
|
||||||
|
msg.Data, err = json.Marshal(update)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logger := log.WithFields(log.Fields{
|
||||||
|
"room_id": roomID,
|
||||||
|
"type": update.Type,
|
||||||
|
})
|
||||||
|
if update.NewRoomEvent != nil {
|
||||||
|
eventType := update.NewRoomEvent.Event.Type()
|
||||||
|
logger = logger.WithFields(log.Fields{
|
||||||
|
"event_type": eventType,
|
||||||
|
"event_id": update.NewRoomEvent.Event.EventID(),
|
||||||
|
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
||||||
|
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
||||||
|
"send_as_server": update.NewRoomEvent.SendAsServer,
|
||||||
|
"sender": update.NewRoomEvent.Event.Sender(),
|
||||||
|
})
|
||||||
|
if update.NewRoomEvent.Event.StateKey() != nil {
|
||||||
|
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
||||||
|
}
|
||||||
|
contentKey := keyContentFields[eventType]
|
||||||
|
if contentKey != "" {
|
||||||
|
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
||||||
|
if value.Exists() {
|
||||||
|
logger = logger.WithField("content_value", value.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
||||||
|
ev := update.NewRoomEvent.Event.Unwrap()
|
||||||
|
defer r.ACLs.OnServerACLUpdate(ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.Tracef("Producing to topic '%s'", r.Topic)
|
||||||
|
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
||||||
|
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.Topic, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -55,7 +55,6 @@ func NewInternalAPI(
|
||||||
return internal.NewRoomserverAPI(
|
return internal.NewRoomserverAPI(
|
||||||
base.ProcessContext, cfg, roomserverDB, js, nc,
|
base.ProcessContext, cfg, roomserverDB, js, nc,
|
||||||
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
||||||
cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
|
|
||||||
base.Caches, perspectiveServerNames,
|
base.Caches, perspectiveServerNames,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue